changeset 17881:4f175c27bea5

lib-http: client: Added support for absolute request timeout. Requests cannot survive beyond this deadline.
author Stephan Bosch <stephan@rename-it.nl>
date Sat, 04 Oct 2014 17:32:48 +0300
parents c6431fb17158
children 77c4b78a4fa2
files src/lib-http/http-client-connection.c src/lib-http/http-client-host.c src/lib-http/http-client-private.h src/lib-http/http-client-queue.c src/lib-http/http-client-request.c src/lib-http/http-client.c src/lib-http/http-client.h
diffstat 7 files changed, 404 insertions(+), 86 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib-http/http-client-connection.c	Sat Oct 04 17:31:38 2014 +0300
+++ b/src/lib-http/http-client-connection.c	Sat Oct 04 17:32:48 2014 +0300
@@ -423,9 +423,8 @@
 	net_set_nonblock(conn->conn.fd_in, TRUE);
 
 	conn->incoming_payload = NULL;
-
+	conn->pending_request = NULL;
 	http_client_request_finish(&req);
-	conn->pending_request = NULL;
 
 	/* room for new requests */
 	if (http_client_connection_is_ready(conn))
@@ -438,6 +437,9 @@
 	   necessary. */
 	conn->to_input =
 		timeout_add_short(0, http_client_payload_destroyed_timeout, conn);
+
+	i_assert(req != NULL);
+	http_client_request_unref(&req);
 }
 
 static bool
@@ -451,6 +453,7 @@
 	i_assert(conn->incoming_payload == NULL);
 	i_assert(conn->pending_request == NULL);
 
+	http_client_request_ref(req);
 	req->state = HTTP_REQUEST_STATE_GOT_RESPONSE;
 
 	if (response->payload != NULL) {
@@ -480,6 +483,7 @@
 		/* the callback managed to get this connection destroyed */
 		if (!retrying)
 			http_client_request_finish(&req);
+		http_client_request_unref(&req);
 		return FALSE;
 	}
 
@@ -493,6 +497,7 @@
 					       http_client_connection_input,
 					       &conn->conn);
 		}
+		http_client_request_unref(&req);
 		return TRUE;
 	}
 
@@ -501,13 +506,17 @@
 		payload = response->payload;
 		response->payload = NULL;
 		conn->pending_request = req;
+
+		/* request is dereferenced in payload destroy callback */
 		i_stream_unref(&payload);
+
 		if (conn->to_input != NULL) {
 			/* already finished reading the payload */
 			http_client_payload_finished(conn);
 		}
 	} else {
 		http_client_request_finish(&req);
+		http_client_request_unref(&req);
 	}
 
 	if (conn->incoming_payload == NULL) {
--- a/src/lib-http/http-client-host.c	Sat Oct 04 17:31:38 2014 +0300
+++ b/src/lib-http/http-client-host.c	Sat Oct 04 17:32:48 2014 +0300
@@ -82,11 +82,12 @@
 	/* make connections to requested ports */
 	array_foreach_modifiable(&host->queues, queue_idx) {
 		struct http_client_queue *queue = *queue_idx;
-		unsigned int count = array_count(&queue->request_queue);
+		unsigned int reqs_pending = 
+			http_client_queue_requests_pending(queue, NULL);
 		queue->ips_connect_idx = queue->ips_connect_start_idx = 0;
-		if (count > 0)
+		if (reqs_pending > 0)
 			http_client_queue_connection_setup(queue);
-		requests += count;
+		requests += reqs_pending;
 	}
 
 	if (requests == 0 && host->client->ioloop != NULL)
--- a/src/lib-http/http-client-private.h	Sat Oct 04 17:31:38 2014 +0300
+++ b/src/lib-http/http-client-private.h	Sat Oct 04 17:32:48 2014 +0300
@@ -75,6 +75,8 @@
 	struct timeval submit_time;
 	struct timeval sent_time;
 	struct timeval response_time;
+	struct timeval timeout_time;
+	unsigned int timeout_msecs;
 
 	unsigned int attempts;
 	unsigned int redirects;
@@ -194,10 +196,17 @@
 	   this can be more than one when soft connect timeouts are enabled */
 	ARRAY_TYPE(http_client_peer) pending_peers;
 
+	/* all requests associated to this queue
+	   (ordered by earliest timeout first) */
+	ARRAY_TYPE(http_client_request) requests; 
+
+	/* delayed requests waiting to be released after delay */
+	ARRAY_TYPE(http_client_request) delayed_requests;
+
 	/* requests pending in queue to be picked up by connections */
-	ARRAY_TYPE(http_client_request) request_queue, delayed_request_queue;
+	ARRAY_TYPE(http_client_request) queued_requests, queued_urgent_requests;
 
-	struct timeout *to_connect, *to_delayed;
+	struct timeout *to_connect, *to_request, *to_delayed;
 };
 
 struct http_client_host {
@@ -332,7 +341,7 @@
 	const struct http_client_peer_addr *addr, bool no_urgent);
 unsigned int
 http_client_queue_requests_pending(struct http_client_queue *queue,
-	unsigned int *num_urgent_r);
+	unsigned int *num_urgent_r) ATTR_NULL(2);
 void
 http_client_queue_connection_success(struct http_client_queue *queue,
 					 const struct http_client_peer_addr *addr);
--- a/src/lib-http/http-client-queue.c	Sat Oct 04 17:31:38 2014 +0300
+++ b/src/lib-http/http-client-queue.c	Sat Oct 04 17:32:48 2014 +0300
@@ -18,6 +18,14 @@
 
 #define TIMEOUT_CMP_MARGIN_USECS 2000
 
+static void
+http_client_queue_set_delay_timer(struct http_client_queue *queue,
+	struct timeval time);
+static void
+http_client_queue_set_request_timer(struct http_client_queue *queue,
+	const struct timeval *time);
+
+
 /*
  * Logging
  */
@@ -42,13 +50,9 @@
 }
 
 /*
- * Queue
+ * Queue object
  */
 
-static void
-http_client_queue_set_delay_timer(struct http_client_queue *queue,
-	struct timeval time);
-
 static struct http_client_queue *
 http_client_queue_find(struct http_client_host *host,
 	const struct http_client_peer_addr *addr)
@@ -99,8 +103,10 @@
 		queue->addr.https_name = queue->https_name;
 		queue->name = name;
 		queue->ips_connect_idx = 0;
-		i_array_init(&queue->request_queue, 16);
-		i_array_init(&queue->delayed_request_queue, 4);
+		i_array_init(&queue->requests, 16);
+		i_array_init(&queue->queued_requests, 16);
+		i_array_init(&queue->queued_urgent_requests, 16);
+		i_array_init(&queue->delayed_requests, 4);
 		array_append(&host->queues, &queue, 1);
 	}
 
@@ -114,8 +120,10 @@
 	i_free(queue->https_name);
 	if (array_is_created(&queue->pending_peers))
 		array_free(&queue->pending_peers);
-	array_free(&queue->request_queue);
-	array_free(&queue->delayed_request_queue);
+	array_free(&queue->requests);
+	array_free(&queue->queued_requests);
+	array_free(&queue->queued_urgent_requests);
+	array_free(&queue->delayed_requests);
 	if (queue->to_connect != NULL)
 		timeout_remove(&queue->to_connect);
 	if (queue->to_delayed != NULL)
@@ -124,6 +132,10 @@
 	i_free(queue);
 }
 
+/*
+ * Error handling
+ */
+
 void http_client_queue_fail(struct http_client_queue *queue,
 	unsigned int status, const char *error)
 {
@@ -131,7 +143,7 @@
 	struct http_client_request **req_idx;
 
 	/* abort all pending requests */
-	req_arr = &queue->request_queue;
+	req_arr = &queue->requests;
 	t_array_init(&treqs, array_count(req_arr));
 	array_copy(&treqs.arr, 0, &req_arr->arr, 0, array_count(req_arr));
 	array_foreach_modifiable(&treqs, req_idx) {
@@ -139,43 +151,15 @@
 	}
 	array_clear(req_arr);
 
-	/* abort all delayed requests */
-	req_arr = &queue->delayed_request_queue;
-	array_clear(&treqs);
-	array_copy(&treqs.arr, 0, &req_arr->arr, 0, array_count(req_arr));
-	array_foreach_modifiable(&treqs, req_idx) {
-		http_client_request_error(*req_idx, status, error);
-	}
-	array_clear(req_arr);
+	/* all queues must be empty now */
+	i_assert(array_count(&queue->delayed_requests) == 0);
+	i_assert(array_count(&queue->queued_requests) == 0);
+	i_assert(array_count(&queue->queued_urgent_requests) == 0);
 }
 
-void
-http_client_queue_drop_request(struct http_client_queue *queue,
-	struct http_client_request *req)
-{
-	ARRAY_TYPE(http_client_request) *req_arr;
-	struct http_client_request **req_idx;
-
-	/* remove from main queue */
-	req_arr = &queue->request_queue;
-	array_foreach_modifiable(req_arr, req_idx) {
-		if (*req_idx == req) {
-			array_delete(req_arr, array_foreach_idx(req_arr, req_idx), 1);
-			break;
-		}
-	}
-
-	/* remove from delay queue */
-	if (req->release_time.tv_sec > 0) {
-		req_arr = &queue->delayed_request_queue;
-		array_foreach_modifiable(req_arr, req_idx) {
-			if (*req_idx == req) {
-				array_delete(req_arr, array_foreach_idx(req_arr, req_idx), 1);
-				break;
-			}
-		}
-	}
-}
+/*
+ * Connection management
+ */
 
 static bool
 http_client_queue_is_last_connect_ip(struct http_client_queue *queue)
@@ -234,7 +218,9 @@
 	struct http_client_host *host = queue->host;
 	struct http_client_peer *peer = NULL;
 	const struct http_client_peer_addr *addr = &queue->addr;
-	unsigned int num_requests = array_count(&queue->request_queue);
+	unsigned int num_requests =
+		array_count(&queue->queued_requests) +
+		array_count(&queue->queued_urgent_requests);
 
 	if (num_requests == 0)
 		return;
@@ -248,6 +234,7 @@
 		(addr->https_name == NULL ? "" :
 			t_strdup_printf(" (SSL=%s)", addr->https_name)), num_requests);
 
+
 	/* create/get peer */
 	peer = http_client_peer_get(queue->client, addr);
 	http_client_peer_link_queue(peer, queue);
@@ -285,7 +272,7 @@
 		/* start soft connect time-out (but only if we have another IP left) */
 		msecs = host->client->set.soft_connect_timeout_msecs;
 		if (!http_client_queue_is_last_connect_ip(queue) && msecs > 0 &&
-		   	queue->to_connect == NULL) {
+			queue->to_connect == NULL) {
 			queue->to_connect =
 				timeout_add(msecs, http_client_queue_soft_connect_timeout, queue);
 		}
@@ -344,7 +331,7 @@
 			t_strdup_printf(" (SSL=%s)", addr->https_name)), reason,
 		(array_is_created(&queue->pending_peers) ?
 		 	array_count(&queue->pending_peers): 0),
-		array_count(&queue->request_queue));
+		array_count(&queue->requests));
 	if (array_is_created(&queue->pending_peers) &&
 		array_count(&queue->pending_peers) > 0) {
 		struct http_client_peer *const *peer_idx;
@@ -391,18 +378,217 @@
 	return TRUE;
 }
 
+/*
+ * Main request queue
+ */
+
+void
+http_client_queue_drop_request(struct http_client_queue *queue,
+	struct http_client_request *req)
+{
+	struct http_client_request **reqs;
+	unsigned int count, i;
+
+	http_client_queue_debug(queue,
+		"Dropping request %s", http_client_request_label(req));
+
+	/* drop from queue */
+	if (req->urgent) {
+		reqs = array_get_modifiable(&queue->queued_urgent_requests, &count);
+		for (i = 0; i < count; i++) {
+			if (reqs[i] == req) {
+				array_delete(&queue->queued_urgent_requests, i, 1);
+				break;
+			}
+		}
+	} else {
+		reqs = array_get_modifiable(&queue->queued_requests, &count);
+		for (i = 0; i < count; i++) {
+			if (reqs[i] == req) {
+				array_delete(&queue->queued_requests, i, 1);
+				break;
+			}
+		}
+	}
+
+	/* drop from delay queue */
+	if (req->release_time.tv_sec > 0) {
+		reqs = array_get_modifiable(&queue->delayed_requests, &count);
+		for (i = 0; i < count; i++) {
+			if (reqs[i] == req)
+				break;
+		}
+		if (i < count) {
+			if (i == 0) {
+				if (queue->to_delayed != NULL) {
+					timeout_remove(&queue->to_delayed);
+					if (count > 1) {
+						i_assert(reqs[1]->release_time.tv_sec > 0);
+						http_client_queue_set_request_timer(queue, &reqs[1]->release_time);
+					}
+				}
+			}
+			array_delete(&queue->delayed_requests, i, 1);
+		}
+	}
+
+	/* drop from main request list */
+	reqs = array_get_modifiable(&queue->requests, &count);
+	for (i = 0; i < count; i++) {
+		if (reqs[i] == req)
+			break;
+	}
+	i_assert(i < count);
+
+	if (i == 0) {
+		if (queue->to_request != NULL) {
+			timeout_remove(&queue->to_request);
+			if (count > 1 && reqs[1]->timeout_time.tv_sec > 0)
+				http_client_queue_set_request_timer(queue, &reqs[1]->timeout_time);
+		}
+	}
+	req->queue = NULL;
+	array_delete(&queue->requests, i, 1);
+	return;
+}
+
+static void
+http_client_queue_request_timeout(struct http_client_queue *queue)
+{
+	struct http_client_request *const *reqs;
+	ARRAY_TYPE(http_client_request) failed_requests;
+	struct timeval new_to = { 0, 0 };
+	unsigned int count, i;
+
+	http_client_queue_debug(queue, "Timeout (now: %s.%03lu)",
+		t_strflocaltime("%Y-%m-%d %H:%M:%S", ioloop_timeval.tv_sec),
+			((unsigned long)ioloop_timeval.tv_usec)/1000);
+
+	if (queue->to_request != NULL)
+		timeout_remove(&queue->to_request);
+
+	/* collect failed requests */
+	reqs = array_get(&queue->requests, &count);
+	i_assert(count > 0);
+	t_array_init(&failed_requests, count);
+	for (i = 0; i < count; i++) {
+		if (reqs[i]->timeout_time.tv_sec > 0 &&
+			timeval_cmp_margin(&reqs[i]->timeout_time,
+				&ioloop_timeval, TIMEOUT_CMP_MARGIN_USECS) > 0) {
+			break;
+		}
+		array_append(&failed_requests, &reqs[i], 1);
+	}
+
+	/* update timout */
+	if (i < count)
+		new_to = reqs[i]->timeout_time;
+
+	/* abort all failed request */
+	reqs = array_get(&failed_requests, &count);
+	i_assert(count > 0); /* at least one request timed out */
+	for (i = 0; i < count; i++) {
+		struct http_client_request *req = reqs[i];
+
+		http_client_queue_debug(queue,
+			"Request %s timed out",	http_client_request_label(req));
+		http_client_request_error(req,
+			HTTP_CLIENT_REQUEST_ERROR_TIMED_OUT,
+			"Timed out");
+	}
+
+	if (new_to.tv_sec > 0) {
+		http_client_queue_debug(queue, "New timeout");
+		http_client_queue_set_request_timer(queue, &new_to);
+	}
+}
+
+static void
+http_client_queue_set_request_timer(struct http_client_queue *queue,
+	const struct timeval *time)
+{
+	i_assert(time->tv_sec > 0);
+	if (queue->to_request != NULL)
+		timeout_remove(&queue->to_request);	
+
+	if (queue->client->set.debug) {
+		http_client_queue_debug(queue,
+			"Set request timeout to %s.%03lu (now: %s.%03lu)",
+			t_strflocaltime("%Y-%m-%d %H:%M:%S", time->tv_sec),
+			((unsigned long)time->tv_usec)/1000,
+			t_strflocaltime("%Y-%m-%d %H:%M:%S", ioloop_timeval.tv_sec),
+			((unsigned long)ioloop_timeval.tv_usec)/1000);
+	}
+
+	/* set timer */
+	queue->to_request = timeout_add_absolute
+		(time, http_client_queue_request_timeout, queue);
+}
+
+static int
+http_client_queue_request_timeout_cmp(struct http_client_request *const *req1,
+	struct http_client_request *const *req2)
+{
+	int ret;
+
+	/* 0 means no timeout */
+	if ((*req1)->timeout_time.tv_sec == 0) {
+		if ((*req2)->timeout_time.tv_sec == 0) {
+			/* sort by age */
+			if ((ret=timeval_cmp(&(*req1)->submit_time, &(*req2)->submit_time)) != 0)
+				return ret;
+				
+		} else {
+			return 1;
+		}
+	} else if ((*req2)->timeout_time.tv_sec == 0) {
+		return -1;
+
+	/* sort by timeout */
+	} else if 
+		((ret=timeval_cmp(&(*req1)->timeout_time, &(*req2)->timeout_time)) != 0) {
+		return ret;
+	}
+
+	/* sort by minumum attempts for fairness */
+	return ((*req2)->attempts - (*req1)->attempts);
+}
+
 static void http_client_queue_submit_now(struct http_client_queue *queue,
 	struct http_client_request *req)
 {
+	ARRAY_TYPE(http_client_request) *req_queue;
+
 	req->release_time.tv_sec = 0;
 	req->release_time.tv_usec = 0;
 
 	if (req->urgent)
-		array_insert(&queue->request_queue, 0, &req, 1);
+		req_queue = &queue->queued_urgent_requests;
 	else
-		array_append(&queue->request_queue, &req, 1);
+		req_queue = &queue->queued_requests;
+
+	/* enqueue */
+	if (req->timeout_time.tv_sec == 0) {
+		/* no timeout; enqueue at end */
+		array_append(req_queue, &req, 1);
+
+	} else if (timeval_diff_msecs(&req->timeout_time, &ioloop_timeval) <= 1) {
+		/* pretty much already timed out; don't bother */
+		
+	} else {
+		unsigned int insert_idx;
+
+		/* keep transmission queue sorted earliest timeout first */
+		(void)array_bsearch_insert_pos(req_queue,
+			&req, http_client_queue_request_timeout_cmp, &insert_idx);
+		array_insert(req_queue, insert_idx, &req, 1);
+	}
 }
 
+/*
+ * Delayed request queue
+ */
+
 static void
 http_client_queue_delay_timeout(struct http_client_queue *queue)
 {
@@ -412,7 +598,7 @@
 	io_loop_time_refresh();
 
 	finished = 0;
-	reqs = array_get(&queue->delayed_request_queue, &count);
+	reqs = array_get(&queue->delayed_requests, &count);
 	for (i = 0; i < count; i++) {
 		if (timeval_cmp_margin(&reqs[i]->release_time,
 			&ioloop_timeval, TIMEOUT_CMP_MARGIN_USECS) > 0) {
@@ -429,7 +615,7 @@
 	if (i < count) {
 		http_client_queue_set_delay_timer(queue, reqs[i]->release_time);
 	}
-	array_delete(&queue->delayed_request_queue, 0, finished);
+	array_delete(&queue->delayed_requests, 0, finished);
 
 	http_client_queue_connection_setup(queue);
 }
@@ -458,21 +644,63 @@
 	return timeval_cmp(&(*req1)->release_time, &(*req2)->release_time);
 }
 
+/*
+ * Request submission
+ */
+
 void http_client_queue_submit_request(struct http_client_queue *queue,
 	struct http_client_request *req)
 {
 	unsigned int insert_idx;
 
+	if (req->queue != NULL)
+		http_client_queue_drop_request(req->queue, req);
 	req->queue = queue;
 
+	/* check delay vs timeout */
+	if (req->release_time.tv_sec > 0 && req->timeout_time.tv_sec > 0 &&
+		timeval_cmp_margin(&req->release_time,
+			&req->timeout_time, TIMEOUT_CMP_MARGIN_USECS) >= 0) {
+		/* release time is later than absolute timeout */
+		req->release_time.tv_sec = 0;
+		req->release_time.tv_usec = 0;
+
+		/* timeout rightaway */
+		req->timeout_time = ioloop_timeval;
+
+		http_client_queue_debug(queue,
+			"Delayed request %s%s already timed out",
+			http_client_request_label(req),
+			(req->urgent ? " (urgent)" : ""));
+	}
+
+	/* add to main request list */
+	if (req->timeout_time.tv_sec == 0) {
+		/* no timeout; just append */
+		array_append(&queue->requests, &req, 1);
+
+	} else {
+		unsigned int insert_idx;
+
+		/* keep main request list sorted earliest timeout first */
+		(void)array_bsearch_insert_pos(&queue->requests,
+			&req, http_client_queue_request_timeout_cmp, &insert_idx);
+		array_insert(&queue->requests, insert_idx, &req, 1);
+
+		/* now first in queue; update timer */
+		if (insert_idx == 0)
+			http_client_queue_set_request_timer(queue, &req->timeout_time);
+	}
+
+	/* handle delay */
 	if (req->release_time.tv_sec > 0) {
 		io_loop_time_refresh();
 
 		if (timeval_cmp_margin(&req->release_time,
 			&ioloop_timeval, TIMEOUT_CMP_MARGIN_USECS) > 0) {
-			(void)array_bsearch_insert_pos(&queue->delayed_request_queue,
+			(void)array_bsearch_insert_pos(&queue->delayed_requests,
 					&req, http_client_queue_delayed_cmp, &insert_idx);
-			array_insert(&queue->delayed_request_queue, insert_idx, &req, 1);
+			array_insert(&queue->delayed_requests, insert_idx, &req, 1);
 			if (insert_idx == 0)
 				http_client_queue_set_delay_timer(queue, req->release_time);
 			return;
@@ -482,6 +710,10 @@
 	http_client_queue_submit_now(queue, req);
 }
 
+/*
+ * Request retrieval
+ */
+
 struct http_client_request *
 http_client_queue_claim_request(struct http_client_queue *queue,
 	const struct http_client_peer_addr *addr, bool no_urgent)
@@ -490,18 +722,20 @@
 	struct http_client_request *req;
 	unsigned int i, count;
 
- 	requests = array_get(&queue->request_queue, &count);
+	count = 0;
+	if (!no_urgent)
+	 	requests = array_get(&queue->queued_urgent_requests, &count);
+
+	if (count == 0)
+	 	requests = array_get(&queue->queued_requests, &count);
 	if (count == 0)
 		return NULL;
 	i = 0;
-	if (requests[0]->urgent && no_urgent) {
-		for (; requests[i]->urgent; i++) {
-			if (i == count)
-				return NULL;
-		}
-	}
 	req = requests[i];
-	array_delete(&queue->request_queue, i, 1);
+	if (req->urgent)
+		array_delete(&queue->queued_urgent_requests, i, 1);
+	else
+		array_delete(&queue->queued_requests, i, 1);
 
 	http_client_queue_debug(queue,
 		"Connection to peer %s claimed request %s %s",
@@ -515,25 +749,23 @@
 http_client_queue_requests_pending(struct http_client_queue *queue,
 	unsigned int *num_urgent_r)
 {
-	struct http_client_request *const *requests;
-	unsigned int count, i;
-
-	*num_urgent_r = 0;
+	unsigned int urg_count = array_count(&queue->queued_urgent_requests); 
 
-	requests = array_get(&queue->request_queue, &count);
-	for (i = 0; i < count; i++) {
-		if (requests[i]->urgent)
-			(*num_urgent_r)++;
-		else
-			break;
-	}
-	return count;
+	if (num_urgent_r != NULL)
+		*num_urgent_r = urg_count;
+	return array_count(&queue->queued_requests) + urg_count;
 }
 
+/*
+ * ioloop
+ */
+
 void http_client_queue_switch_ioloop(struct http_client_queue *queue)
 {
 	if (queue->to_connect != NULL)
 		queue->to_connect = io_loop_move_timeout(&queue->to_connect);
+	if (queue->to_request != NULL)
+		queue->to_request = io_loop_move_timeout(&queue->to_request);
 	if (queue->to_delayed != NULL)
 		queue->to_delayed = io_loop_move_timeout(&queue->to_delayed);
 }
--- a/src/lib-http/http-client-request.c	Sat Oct 04 17:31:38 2014 +0300
+++ b/src/lib-http/http-client-request.c	Sat Oct 04 17:32:48 2014 +0300
@@ -152,6 +152,12 @@
 	if (--req->refcount > 0)
 		return;
 
+	/* cannot be destroyed while it is still pending */
+	i_assert(req->conn == NULL || req->conn->pending_request == NULL);
+
+	if (req->queue != NULL)
+		http_client_queue_drop_request(req->queue, req);
+
 	if (req->destroy_callback != NULL) {
 		req->destroy_callback(req->destroy_context);
 		req->destroy_callback = NULL;
@@ -306,6 +312,25 @@
 		req->payload_sync = TRUE;
 }
 
+void http_client_request_set_timeout_msecs(struct http_client_request *req,
+	unsigned int msecs)
+{
+	i_assert(req->state == HTTP_REQUEST_STATE_NEW ||
+		req->state == HTTP_REQUEST_STATE_GOT_RESPONSE);
+
+	req->timeout_msecs = msecs;
+}
+
+void http_client_request_set_timeout(struct http_client_request *req,
+	const struct timeval *time)
+{
+	i_assert(req->state == HTTP_REQUEST_STATE_NEW ||
+		req->state == HTTP_REQUEST_STATE_GOT_RESPONSE);
+
+	req->timeout_time = *time;
+	req->timeout_msecs = 0;
+}
+
 void http_client_request_delay_until(struct http_client_request *req,
 	time_t time)
 {
@@ -440,6 +465,16 @@
 			req->urgent = TRUE;
 	}
 
+	if (req->timeout_time.tv_sec == 0) {
+		if (req->timeout_msecs > 0) {
+			req->timeout_time = ioloop_timeval;
+			timeval_add_msecs(&req->timeout_time, req->timeout_msecs);
+		} else if (	client->set.request_absolute_timeout_msecs > 0) {
+			req->timeout_time = ioloop_timeval;
+			timeval_add_msecs(&req->timeout_time, client->set.request_absolute_timeout_msecs);
+		}
+	}
+
 	host = http_client_host_get(req->client, req->host_url);
 	req->state = HTTP_REQUEST_STATE_QUEUED;
 
@@ -818,6 +853,7 @@
 			       unsigned int status, const char *error)
 {
 	http_client_request_callback_t *callback;
+	bool sending = (req->state == HTTP_REQUEST_STATE_PAYLOAD_OUT);
 
 	if (req->state >= HTTP_REQUEST_STATE_FINISHED)
 		return;
@@ -831,8 +867,8 @@
 		http_response_init(&response, status, error);
 		(void)callback(&response, req->context);
 
-		/* release payload early (prevents server/client in proxy) */
-		if (req->payload_input != NULL)
+		/* release payload early (prevents server/client deadlock in proxy) */
+		if (!sending && req->payload_input != NULL)
 			i_stream_unref(&req->payload_input);
 	}
 }
@@ -841,16 +877,24 @@
 {
 	struct http_client_request *req = *_req;
 
+	if (req->state >= HTTP_REQUEST_STATE_FINISHED)
+		return;
+
 	i_assert(req->delayed_error != NULL && req->delayed_error_status != 0);
 	http_client_request_send_error(req, req->delayed_error_status,
 				       req->delayed_error);
+	if (req->queue != NULL)
+		http_client_queue_drop_request(req->queue, req);
 	http_client_request_unref(_req);
 }
 
 void http_client_request_error(struct http_client_request *req,
 	unsigned int status, const char *error)
 {
-	if (!req->submitted && req->state < HTTP_REQUEST_STATE_FINISHED) {
+	if (req->state >= HTTP_REQUEST_STATE_FINISHED)
+		return;
+
+	if (!req->submitted) {
 		/* we're still in http_client_request_submit(). delay
 		   reporting the error, so the caller doesn't have to handle
 		   immediate callbacks. */
@@ -860,6 +904,8 @@
 		http_client_host_delay_request_error(req->host, req);
 	} else {
 		http_client_request_send_error(req, status, error);
+		if (req->queue != NULL)
+			http_client_queue_drop_request(req->queue, req);
 		http_client_request_unref(&req);
 	}
 }
@@ -867,11 +913,18 @@
 void http_client_request_abort(struct http_client_request **_req)
 {
 	struct http_client_request *req = *_req;
+	bool sending = (req->state == HTTP_REQUEST_STATE_PAYLOAD_OUT);
 
 	if (req->state >= HTTP_REQUEST_STATE_FINISHED)
 		return;
+
 	req->callback = NULL;
 	req->state = HTTP_REQUEST_STATE_ABORTED;
+
+	/* release payload early (prevents server/client deadlock in proxy) */
+	if (!sending && req->payload_input != NULL)
+		i_stream_unref(&req->payload_input);
+
 	if (req->queue != NULL)
 		http_client_queue_drop_request(req->queue, req);
 	http_client_request_unref(_req);
@@ -889,6 +942,8 @@
 	req->callback = NULL;
 	req->state = HTTP_REQUEST_STATE_FINISHED;
 
+	if (req->queue != NULL)
+		http_client_queue_drop_request(req->queue, req);
 	if (req->payload_wait && req->client->ioloop != NULL)
 		io_loop_stop(req->client->ioloop);
 	http_client_request_unref(_req);
@@ -951,7 +1006,6 @@
 	}
 	
 	req->host = NULL;
-	req->queue = NULL;
 	req->conn = NULL;
 
 	origin_url = http_url_create(&req->origin_url);
--- a/src/lib-http/http-client.c	Sat Oct 04 17:31:38 2014 +0300
+++ b/src/lib-http/http-client.c	Sat Oct 04 17:32:48 2014 +0300
@@ -129,6 +129,8 @@
 	client->set.no_ssl_tunnel = set->no_ssl_tunnel;
 	client->set.max_redirects = set->max_redirects;
 	client->set.response_hdr_limits = set->response_hdr_limits;
+	client->set.request_absolute_timeout_msecs =
+		set->request_absolute_timeout_msecs;
 	client->set.request_timeout_msecs = set->request_timeout_msecs;
 	client->set.connect_timeout_msecs = set->connect_timeout_msecs;
 	client->set.soft_connect_timeout_msecs = set->soft_connect_timeout_msecs;
--- a/src/lib-http/http-client.h	Sat Oct 04 17:31:38 2014 +0300
+++ b/src/lib-http/http-client.h	Sat Oct 04 17:32:48 2014 +0300
@@ -98,6 +98,12 @@
 	/* response header limits */
 	struct http_header_limits response_hdr_limits;
 
+	/* max total time to wait for HTTP request to finish
+	   this can be overridden/reset for individual requests using
+	   http_client_request_set_timeout() and friends.
+	   (default is no timeout)
+	 */
+	unsigned int request_absolute_timeout_msecs;
 	/* max time to wait for HTTP request to finish before retrying
 	   (default = unlimited) */
 	unsigned int request_timeout_msecs;
@@ -194,6 +200,11 @@
 void http_client_request_set_payload(struct http_client_request *req,
 				     struct istream *input, bool sync);
 
+void http_client_request_set_timeout_msecs(struct http_client_request *req,
+	unsigned int msecs);
+void http_client_request_set_timeout(struct http_client_request *req,
+	const struct timeval *time);
+
 void http_client_request_delay_until(struct http_client_request *req,
 	time_t time);
 void http_client_request_delay(struct http_client_request *req,