changeset 16733:bdc5d6dcfc53

lib-http: http-client: Fixed request scheduling and connection management.
author Stephan Bosch <stephan@rename-it.nl>
date Sun, 15 Sep 2013 03:31:28 +0300
parents 0bcbc30b81b7
children 819dd674f100
files src/lib-http/http-client-connection.c src/lib-http/http-client-host.c src/lib-http/http-client-peer.c src/lib-http/http-client-private.h src/lib-http/http-client-request.c src/lib-http/http-client.c src/lib-http/http-client.h
diffstat 7 files changed, 391 insertions(+), 179 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib-http/http-client-connection.c	Sun Sep 15 03:29:03 2013 +0300
+++ b/src/lib-http/http-client-connection.c	Sun Sep 15 03:31:28 2013 +0300
@@ -167,8 +167,7 @@
 	http_client_connection_unref(&conn);
 }
 
-static void
-http_client_connection_check_idle(struct http_client_connection *conn)
+void http_client_connection_check_idle(struct http_client_connection *conn)
 {
 	unsigned int timeout, count;
 
@@ -254,7 +253,7 @@
 	}
 }
 
-bool http_client_connection_next_request(struct http_client_connection *conn)
+int http_client_connection_next_request(struct http_client_connection *conn)
 {
 	struct http_client_request *req = NULL;
 	const char *error;
@@ -262,17 +261,15 @@
 
 	if (!http_client_connection_is_ready(conn)) {
 		http_client_connection_debug(conn, "Not ready for next request");
-		return FALSE;
+		return 0;
 	}
 
 	/* claim request, but no urgent request can be second in line */
 	have_pending_requests = array_count(&conn->request_wait_list) > 0 ||
 		conn->pending_request != NULL;
 	req = http_client_peer_claim_request(conn->peer, have_pending_requests);
-	if (req == NULL) {
-		http_client_connection_check_idle(conn);
-		return FALSE;	
-	}
+	if (req == NULL)
+		return 0;	
 
 	if (conn->to_idle != NULL)
 		timeout_remove(&conn->to_idle);
@@ -300,7 +297,7 @@
 		http_client_connection_abort_temp_error(&conn,
 			HTTP_CLIENT_REQUEST_ERROR_CONNECTION_LOST,
 			t_strdup_printf("Failed to send request: %s", error));
-		return FALSE;
+		return -1;
 	}
 
 	/* https://tools.ietf.org/html/draft-ietf-httpbis-p2-semantics-21;
@@ -321,7 +318,7 @@
 			http_client_connection_continue_timeout, conn);
 	}
 
-	return TRUE;
+	return 1;
 }
 
 static void http_client_connection_destroy(struct connection *_conn)
@@ -595,7 +592,7 @@
 				conn->output_locked = FALSE;
 				conn->peer->no_payload_sync = TRUE;
 				http_client_request_retry(req, response->status, response->reason);
-				return;
+	
 			} else if (response->status / 100 == 3 && response->status != 304 &&
 				response->location != NULL) {
 				/* redirect */
@@ -648,9 +645,13 @@
 	}
 
 	if (finished > 0) {
+		/* connection still alive after (at least one) request;
+		   we can pipeline -> mark for subsequent connections */
+		conn->peer->allows_pipelining = TRUE;
+
 		/* room for new requests */
-		http_client_peer_handle_requests(conn->peer);
-		http_client_connection_check_idle(conn);
+		if (http_client_connection_is_ready(conn))
+			http_client_peer_trigger_request_handler(conn->peer);
 	}
 }
 
@@ -688,8 +689,8 @@
 			}
 			if (!conn->output_locked) {
 				/* room for new requests */
-				http_client_peer_handle_requests(conn->peer);
-				http_client_connection_check_idle(conn);
+				if (http_client_connection_is_ready(conn))
+					http_client_peer_trigger_request_handler(conn->peer);
 			}
 		}
 	}
@@ -701,27 +702,28 @@
 {
 	struct stat st;
 
+	/* connected */
 	conn->connected = TRUE;
-	conn->connect_succeeded = TRUE;
 	if (conn->to_connect != NULL &&
 	    (conn->ssl_iostream == NULL ||
 	     ssl_iostream_is_handshaked(conn->ssl_iostream)))
 		timeout_remove(&conn->to_connect);
 
+	/* indicate connection success */
+	conn->connect_succeeded = TRUE;
 	http_client_peer_connection_success(conn->peer);
 
+	/* start raw log */
 	if (conn->client->set.rawlog_dir != NULL &&
 		stat(conn->client->set.rawlog_dir, &st) == 0) {
 		iostream_rawlog_create(conn->client->set.rawlog_dir,
 				       &conn->conn.input, &conn->conn.output);
 	}
 
+	/* start protocol I/O */
 	conn->http_parser = http_response_parser_init(conn->conn.input);
 	o_stream_set_flush_callback(conn->conn.output,
     http_client_connection_output, conn);
-
-	/* we never pipeline before the first response */
-	(void)http_client_connection_next_request(conn);
 }
 
 static int
@@ -872,23 +874,24 @@
 {
 	struct http_client_connection *conn;
 	static unsigned int id = 0;
+	const struct http_client_peer_addr *addr = &peer->addr;
 
 	conn = i_new(struct http_client_connection, 1);
 	conn->refcount = 1;
 	conn->client = peer->client;
+	conn->id = id++;
 	conn->peer = peer;
 	i_array_init(&conn->request_wait_list, 16);
 
 	connection_init_client_ip
-		(peer->client->conn_list, &conn->conn, &peer->addr.ip, peer->addr.port);
+		(peer->client->conn_list, &conn->conn, &addr->ip, addr->port);
 	http_client_connection_connect(conn);
 
-	conn->id = id++;
 	array_append(&peer->conns, &conn, 1);
 
 	http_client_connection_debug(conn,
-		"Connection created (%d parallel connections exist)",
-		array_count(&peer->conns));
+		"Connection created (%d parallel connections exist)%s",
+		array_count(&peer->conns), (conn->to_input == NULL ? "" : " [broken]"));
 	return conn;
 }
 
--- a/src/lib-http/http-client-host.c	Sun Sep 15 03:29:03 2013 +0300
+++ b/src/lib-http/http-client-host.c	Sun Sep 15 03:31:28 2013 +0300
@@ -46,13 +46,13 @@
 
 static struct http_client_host_port *
 http_client_host_port_find(struct http_client_host *host,
-	unsigned int port, const char *https_name)
+	in_port_t port, const char *https_name)
 {
 	struct http_client_host_port *hport;
 
 	array_foreach_modifiable(&host->ports, hport) {
-		if (hport->port == port &&
-		    null_strcmp(hport->https_name, https_name) == 0)
+		if (hport->addr.port == port &&
+		    null_strcmp(hport->addr.https_name, https_name) == 0)
 			return hport;
 	}
 
@@ -61,7 +61,7 @@
 
 static struct http_client_host_port *
 http_client_host_port_init(struct http_client_host *host,
-	unsigned int port, const char *https_name)
+	in_port_t port, const char *https_name)
 {
 	struct http_client_host_port *hport;
 
@@ -69,8 +69,8 @@
 	if (hport == NULL) {
 		hport = array_append_space(&host->ports);
 		hport->host = host;
-		hport->port = port;
-		hport->https_name = i_strdup(https_name);
+		hport->addr.port = port;
+		hport->addr.https_name = i_strdup(https_name);
 		hport->ips_connect_idx = 0;
 		i_array_init(&hport->request_queue, 16);
 	}
@@ -94,7 +94,9 @@
 {
 	http_client_host_port_error
 		(hport, HTTP_CLIENT_REQUEST_ERROR_ABORTED, "Aborted");
-	i_free(hport->https_name);
+	i_free(hport->addr.https_name);
+	if (array_is_created(&hport->pending_peers))
+		array_free(&hport->pending_peers);
 	array_free(&hport->request_queue);
 }
 
@@ -136,20 +138,25 @@
 	if (hport->to_connect != NULL)
 		timeout_remove(&hport->to_connect);
 
-	if (http_client_hport_is_last_connect_ip(hport))
+	if (http_client_hport_is_last_connect_ip(hport)) {
+		/* no more IPs to try */
 		return;
+	}
 
 	/* if our our previous connection attempt takes longer than the
-	   soft_connect_timeout we start a connection attempt to the next IP in
+	   soft_connect_timeout, we start a connection attempt to the next IP in
 	   parallel */
 
 	http_client_host_debug(host, "Connection to %s:%u%s is taking a long time; "
 		"starting parallel connection attempt to next IP",
-		net_ip2addr(&host->ips[hport->ips_connect_idx]), hport->port,
-		hport->https_name == NULL ? "" :
-			t_strdup_printf(" (SSL=%s)", hport->https_name));
+		net_ip2addr(&hport->addr.ip), hport->addr.port,
+		hport->addr.https_name == NULL ? "" :
+			t_strdup_printf(" (SSL=%s)", hport->addr.https_name));
 
+	/* next IP */
 	hport->ips_connect_idx = (hport->ips_connect_idx + 1) % host->ips_count;
+
+	/* setup connection to new peer (can start new soft timeout) */
 	http_client_host_port_connection_setup(hport);
 }
 
@@ -158,59 +165,41 @@
 {
 	struct http_client_host *host = hport->host;
 	struct http_client_peer *peer = NULL;
-	struct http_client_peer_addr addr;
-	unsigned int msecs;
+	const struct http_client_peer_addr *addr = &hport->addr;
+	unsigned int num_requests = array_count(&hport->request_queue);
 
-	addr.ip = host->ips[hport->ips_connect_idx];
-	addr.port = hport->port;
-	addr.https_name = hport->https_name;
+	if (num_requests == 0)
+		return;
 
-	http_client_host_debug(host, "Setting up connection to %s:%u%s",
-		net_ip2addr(&addr.ip), addr.port, addr.https_name == NULL ? "" :
-		t_strdup_printf(" (SSL=%s)", addr.https_name));
+	/* update our peer address */
+	hport->addr.ip = host->ips[hport->ips_connect_idx];
 
-	peer = http_client_peer_get(host->client, &addr);
-	http_client_peer_add_host(peer, host);
-	if (http_client_peer_handle_requests(peer))
-		hport->pending_connection_count++;
+	http_client_host_debug(host, "Setting up connection to %s:%u%s "
+		"(%u requests pending)", net_ip2addr(&addr->ip), addr->port,
+		addr->https_name == NULL ? "" :
+			t_strdup_printf(" (SSL=%s)", addr->https_name), num_requests);
 
-	/* start soft connect time-out (but only if we have another IP left) */
-	msecs = host->client->set.soft_connect_timeout_msecs;
-	if (!http_client_hport_is_last_connect_ip(hport) && msecs > 0 &&
-	    hport->to_connect == NULL) {
-		hport->to_connect =
-			timeout_add(msecs, http_client_host_port_soft_connect_timeout, hport);
-	}
-}
+	/* create/get peer */
+	peer = http_client_peer_get(host->client, addr);
+	http_client_peer_add_host(peer, host);
 
-static void
-http_client_host_drop_pending_connections(struct http_client_host_port *hport,
-					  const struct http_client_peer_addr *addr)
-{
-	struct http_client_peer *peer;
-	struct http_client_connection *const *conns, *conn;
-	unsigned int i, count;
+	/* handle requests; creates new connections when needed/possible */
+	http_client_peer_trigger_request_handler(peer);
+
+	if (!http_client_peer_is_connected(peer)) {
+		unsigned int msecs;
 
-	for (peer = hport->host->client->peers_list; peer != NULL; peer = peer->next) {
-		if (http_client_peer_addr_cmp(&peer->addr, addr) == 0) {
-			/* don't drop any connections to the successfully
-			   connected peer, even if some of the connections
-			   are pending. they may be intended for urgent
-			   requests. */
-			continue;
-		}
-		if (!http_client_peer_have_host(peer, hport->host))
-			continue;
+		/* not already connected, wait for connections */
+		if (!array_is_created(&hport->pending_peers))
+			i_array_init(&hport->pending_peers, 8);
+		array_append(&hport->pending_peers, &peer, 1);			
 
-		conns = array_get(&peer->conns, &count);
-		for (i = count; i > 0; i--) {
-			conn = conns[i-1];
-			if (!conn->connected) {
-				i_assert(conn->refcount == 1);
-				/* avoid recreating the connection */
-				peer->last_connect_failed = TRUE;
-				http_client_connection_unref(&conn);
-			}
+		/* start soft connect time-out (but only if we have another IP left) */
+		msecs = host->client->set.soft_connect_timeout_msecs;
+		if (!http_client_hport_is_last_connect_ip(hport) && msecs > 0 &&
+		    hport->to_connect == NULL) {
+			hport->to_connect =
+				timeout_add(msecs, http_client_host_port_soft_connect_timeout, hport);
 		}
 	}
 }
@@ -241,28 +230,51 @@
 		timeout_remove(&hport->to_connect);
 
 	/* drop all other attempts to the hport. note that we get here whenever
-	   a connection is successfully created, so pending_connection_count
-	   may be 0. */
-	if (hport->pending_connection_count > 1)
-		http_client_host_drop_pending_connections(hport, addr);
-	/* since this hport is now successfully connected, we won't be
-	   getting any connection failures to it anymore. so we need
-	   to reset the pending_connection_count count here. */
-	hport->pending_connection_count = 0;
+	   a connection is successfully created, so pending_peers array
+	   may be empty. */
+	if (array_is_created(&hport->pending_peers) &&
+		array_count(&hport->pending_peers) > 0) {
+		struct http_client_peer *const *peer_idx;
+
+		array_foreach(&hport->pending_peers, peer_idx) {
+			if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) {
+				/* don't drop any connections to the successfully
+				   connected peer, even if some of the connections
+				   are pending. they may be intended for urgent
+				   requests. */
+				continue;
+			}
+			/* remove this host from the peer; if this was the last/only host, the
+			   peer will be freed, closing all connections.
+			 */
+			http_client_peer_remove_host(*peer_idx, hport->host);
+		}
+		array_clear(&hport->pending_peers);
+	}
 }
 
 static bool
 http_client_host_port_connection_failure(struct http_client_host_port *hport,
-	const char *reason)
+	const struct http_client_peer_addr *addr, const char *reason)
 {
 	struct http_client_host *host = hport->host;
 
-	if (hport->pending_connection_count > 0) {
+	if (array_is_created(&hport->pending_peers) &&
+		array_count(&hport->pending_peers) > 0) {
+		struct http_client_peer *const *peer_idx;
+
 		/* we're still doing the initial connections to this hport. if
 		   we're also doing parallel connections with soft timeouts
-		   (pending_connection_count>1), wait for them to finish
+		   (pending_peer_count>1), wait for them to finish
 		   first. */
-		if (--hport->pending_connection_count > 0)
+		array_foreach(&hport->pending_peers, peer_idx) {
+			if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) {
+				array_delete(&hport->pending_peers,
+					array_foreach_idx(&hport->pending_peers, peer_idx), 1);
+				break;
+			}
+		}
+		if (array_count(&hport->pending_peers) > 0)
 			return TRUE;
 	}
 
@@ -282,7 +294,6 @@
 		return FALSE;
 	}
 	hport->ips_connect_idx = (hport->ips_connect_idx + 1) % host->ips_count;
-
 	http_client_host_port_connection_setup(hport);
 	return TRUE;
 }
@@ -318,7 +329,7 @@
 	if (hport == NULL)
 		return;
 
-	if (!http_client_host_port_connection_failure(hport, reason)) {
+	if (!http_client_host_port_connection_failure(hport, addr, reason)) {
 		/* failed definitively for currently queued requests */
 		if (host->client->ioloop != NULL)
 			io_loop_stop(host->client->ioloop);
@@ -359,8 +370,8 @@
 	host->ips_count = result->ips_count;
 	host->ips = i_new(struct ip_addr, host->ips_count);
 	memcpy(host->ips, result->ips, sizeof(*host->ips) * host->ips_count);
-
-	// FIXME: make DNS result expire 
+	
+	/* FIXME: make DNS result expire */
 
 	/* make connections to requested ports */
 	array_foreach_modifiable(&host->ports, hport) {
@@ -488,7 +499,7 @@
 	if (hport == NULL)
 		return NULL;
 
-	requests = array_get(&hport->request_queue, &count);
+ 	requests = array_get(&hport->request_queue, &count);
 	if (count == 0)
 		return NULL;
 	i = 0;
--- a/src/lib-http/http-client-peer.c	Sun Sep 15 03:29:03 2013 +0300
+++ b/src/lib-http/http-client-peer.c	Sun Sep 15 03:31:28 2013 +0300
@@ -75,6 +75,27 @@
 	}
 }
 
+bool http_client_peer_is_connected(struct http_client_peer *peer)
+{
+	struct http_client_connection *const *conn_idx;
+
+	array_foreach(&peer->conns, conn_idx) {
+		if ((*conn_idx)->connected)
+			return TRUE;
+	}
+
+	return FALSE;
+}
+
+static void http_client_peer_check_idle(struct http_client_peer *peer)
+{
+	struct http_client_connection *const *conn_idx;
+
+	array_foreach(&peer->conns, conn_idx) {
+		http_client_connection_check_idle(*conn_idx);
+	}
+}
+
 static unsigned int
 http_client_peer_requests_pending(struct http_client_peer *peer,
 				  unsigned int *num_urgent_r)
@@ -92,91 +113,220 @@
 	return num_requests;
 }
 
-static bool
-http_client_peer_next_request(struct http_client_peer *peer,
-			      bool *created_connections)
+static void
+http_client_peer_handle_requests_real(struct http_client_peer *peer)
 {
+	struct _conn_available {
+		struct http_client_connection *conn;
+		unsigned int pending_requests;
+	};
 	struct http_client_connection *const *conn_idx;
-	struct http_client_connection *conn = NULL;
-	unsigned int connecting = 0, closing = 0, min_waiting = UINT_MAX;
-	unsigned int num_urgent, new_connections, working_conn_count;
+	ARRAY(struct _conn_available) conns_avail;
+	struct _conn_available *conn_avail_idx;
+	unsigned int connecting, closing, idle;
+	unsigned int num_pending, num_urgent, new_connections, 	working_conn_count;
+	bool statistics_dirty = TRUE;
+
+	/* FIXME: limit the number of requests handled in one run to prevent
+	   I/O starvation. */
 
-	if (http_client_peer_requests_pending(peer, &num_urgent) == 0)
-		return FALSE;
+	/* don't do anything unless we have pending requests */
+	num_pending = http_client_peer_requests_pending(peer, &num_urgent);
+	if (num_pending == 0) {
+		http_client_peer_check_idle(peer);
+		return;
+	}
+
+	t_array_init(&conns_avail, array_count(&peer->conns));
+	do {
+		array_clear(&conns_avail);
+		connecting = closing = idle = 0;
+
+		/* gather connection statistics */
+		array_foreach(&peer->conns, conn_idx) {
+			if (http_client_connection_is_ready(*conn_idx)) {			
+				struct _conn_available *conn_avail;
+				unsigned int insert_idx, pending_requests;
 
-	/* find the least busy connection */
-	array_foreach(&peer->conns, conn_idx) {
-		if (http_client_connection_is_ready(*conn_idx)) {
-			unsigned int waiting = http_client_connection_count_pending(*conn_idx);
+				/* compile sorted availability list */
+				pending_requests = http_client_connection_count_pending(*conn_idx);
+				if (array_count(&conns_avail) == 0) {
+					insert_idx = 0;
+				} else {
+					insert_idx = array_count(&conns_avail);
+					array_foreach_modifiable(&conns_avail, conn_avail_idx) {
+						if (conn_avail_idx->pending_requests > pending_requests) {
+							insert_idx = array_foreach_idx(&conns_avail, conn_avail_idx);
+							break;
+						}
+					}
+				}
+				conn_avail = array_insert_space(&conns_avail, insert_idx);
+				conn_avail->conn = *conn_idx;
+				conn_avail->pending_requests = pending_requests;
+				if (pending_requests == 0)
+					idle++;
+			}
+			/* count the number of connecting and closing connections */
+			if ((*conn_idx)->closing)
+				closing++;
+			else if (!(*conn_idx)->connected)
+				connecting++;
+		}
 
-			if (waiting < min_waiting) {
-				min_waiting = waiting;
-				conn = *conn_idx;
-				if (min_waiting == 0) {
-					/* found idle connection, use it now */
+		working_conn_count = array_count(&peer->conns) - closing;
+		statistics_dirty = FALSE;
+
+		/* use idle connections right away */
+		if (idle > 0) {
+			http_client_peer_debug(peer,
+				"Using %u idle connections to handle %u requests "
+				"(%u total connections ready)",
+				idle, num_pending > idle ? idle : num_pending,
+				array_count(&conns_avail));
+
+			array_foreach_modifiable(&conns_avail, conn_avail_idx) {
+				if (num_pending == 0 || conn_avail_idx->pending_requests > 0)
 					break;
+				idle--;
+				if (http_client_connection_next_request(conn_avail_idx->conn) <= 0) {
+					/* no longer available (probably connection error/closed) */
+					statistics_dirty = TRUE;
+					conn_avail_idx->conn = NULL;
+				} else {
+					/* update statistics */
+					conn_avail_idx->pending_requests++;
+					if (num_urgent > 0)
+						num_urgent--;
+					num_pending--;
 				}
 			}
 		}
-		/* count the number of connecting and closing connections */
-		if ((*conn_idx)->closing)
-			closing++;
-		else if (!(*conn_idx)->connected)
-			connecting++;
-	}
-	working_conn_count = array_count(&peer->conns) - closing;
-
-	/* did we find an idle connection? */
-	if (conn != NULL && min_waiting == 0) {
-		/* yes, use it */
-		return http_client_connection_next_request(conn);
-	}
+	
+		/* don't continue unless we have more pending requests */
+		num_pending = http_client_peer_requests_pending(peer, &num_urgent);
+		if (num_pending == 0) {
+			http_client_peer_check_idle(peer);
+			return;
+		}
+	} while (statistics_dirty);
 
-	/* no, but can we create a new connection? */		
-	if (num_urgent == 0 &&
-	    working_conn_count >= peer->client->set.max_parallel_connections) {
-		/* no */
-		if (conn == NULL) {
-			http_client_peer_debug(peer,
-				"Only non-urgent requests, and we already have "
-				"%u pending connections", working_conn_count);
-			return FALSE;
-		}
-		/* pipeline it */
-		return http_client_connection_next_request(conn);
-	}
+	i_assert(idle == 0);
 
-	/* yes, determine how many connections to set up */
+	/* determine how many new connections we can set up */
 	if (peer->last_connect_failed && working_conn_count > 0 &&
 	    working_conn_count == connecting) {
 		/* don't create new connections until the existing ones have
 		   finished connecting successfully. */
 		new_connections = 0;
-	} else if (num_urgent == 0) {
-		new_connections = connecting == 0 ? 1 : 0;
 	} else {
-		new_connections = (num_urgent > connecting ? num_urgent - connecting : 0);
+		if (working_conn_count - connecting + num_urgent >=
+			peer->client->set.max_parallel_connections) {
+			/* only create connections for urgent requests */
+			new_connections = (num_urgent > connecting ? num_urgent - connecting : 0);
+		} else if (num_pending <= connecting) {
+			/* there are already enough connections being made */
+			new_connections = 0;
+		} else if (working_conn_count == connecting) {
+			/* no connections succeeded so far, don't hammer the server with more
+			   than one connection attempt unless its urgent */
+			if (num_urgent > 0) {
+				new_connections =
+					(num_urgent > connecting ? num_urgent - connecting : 0);
+			} else {
+				new_connections = (connecting == 0 ? 1 : 0);
+			}
+		} else if (num_pending - connecting >
+			peer->client->set.max_parallel_connections - working_conn_count) {
+			/* create maximum allowed connections */
+			new_connections =
+				peer->client->set.max_parallel_connections - working_conn_count;
+		} else {
+			/* create as many connections as we need */
+			new_connections = num_pending - connecting;
+		}
 	}
-	http_client_peer_debug(peer,
-		"Creating %u new connections to handle requests "
-		"(already %u usable, connecting to %u, closing %u)",
-		new_connections, working_conn_count - connecting,
-		connecting, closing);
+
+	/* create connections */
 	if (new_connections > 0) {
-		*created_connections = TRUE;
+		http_client_peer_debug(peer,
+			"Creating %u new connections to handle requests "
+			"(already %u usable, connecting to %u, closing %u)",
+			new_connections, working_conn_count - connecting,
+			connecting, closing);
 		http_client_peer_connect(peer, new_connections);
+		return;
 	}
 
-	/* now we wait until it is connected */
-	return FALSE;
+	/* cannot create new connections for normal request; attempt pipelining */
+	if (working_conn_count - connecting >=
+		peer->client->set.max_parallel_connections) {
+		unsigned int pipeline_level = 0, total_handled = 0, handled;
+
+		if (!peer->allows_pipelining) {
+			http_client_peer_debug(peer,
+				"Will not pipeline until peer has shown support");
+			return;
+		}
+
+		/* fill pipelines */
+		do {
+			handled = 0;
+			/* fill smallest pipelines first,
+			   until all pipelines are filled to the same level */
+			array_foreach_modifiable(&conns_avail, conn_avail_idx) {
+				if (conn_avail_idx->conn == NULL)
+					continue;
+				if (pipeline_level == 0) {
+					pipeline_level = conn_avail_idx->pending_requests;
+				} else if (conn_avail_idx->pending_requests > pipeline_level) {
+					pipeline_level = conn_avail_idx->pending_requests;
+					break; /* restart from least busy connection */
+				}
+				/* pipeline it */
+				if (http_client_connection_next_request(conn_avail_idx->conn) <= 0) {
+					/* connection now unavailable */
+					conn_avail_idx->conn = NULL;
+				} else {
+					/* successfully pipelined */
+					conn_avail_idx->pending_requests++;
+					num_pending--;
+					handled++;
+				}
+			}
+			
+			total_handled += handled;
+		} while (num_pending > num_urgent && handled > 0);
+
+		http_client_peer_debug(peer,
+			"Pipelined %u requests (filled pipelines up to %u requests)",
+			total_handled, pipeline_level);
+		return;
+	}
+
+	/* still waiting for connections to finish */
+	http_client_peer_debug(peer,
+		"No request handled; waiting for new connections");
+	return;
 }
 
-bool http_client_peer_handle_requests(struct http_client_peer *peer)
+static void http_client_peer_handle_requests(struct http_client_peer *peer)
 {
-	bool created_connections = FALSE;
+	if (peer->to_req_handling != NULL)
+		timeout_remove(&peer->to_req_handling);
+	
+	T_BEGIN {
+		http_client_peer_handle_requests_real(peer);
+	} T_END;
+}
 
-	while (http_client_peer_next_request(peer, &created_connections)) ;
-	return created_connections;
+void http_client_peer_trigger_request_handler(struct http_client_peer *peer)
+{
+	/* trigger request handling through timeout */
+	if (peer->to_req_handling == NULL) {
+		peer->to_req_handling =
+			timeout_add_short(0, http_client_peer_handle_requests, peer);
+	}
 }
 
 static struct http_client_peer *
@@ -215,6 +365,9 @@
 
 	http_client_peer_debug(peer, "Peer destroy");
 
+	if (peer->to_req_handling != NULL)
+		timeout_remove(&peer->to_req_handling);
+
 	/* make a copy of the connection array; freed connections modify it */
 	t_array_init(&conns, array_count(&peer->conns));
 	array_copy(&conns.arr, 0, &peer->conns.arr, 0, array_count(&peer->conns));
@@ -268,6 +421,21 @@
 		array_append(&peer->hosts, &host, 1);
 }
 
+void http_client_peer_remove_host(struct http_client_peer *peer,
+				struct http_client_host *host)
+{
+	struct http_client_host *const *host_idx;
+
+	array_foreach(&peer->hosts, host_idx) {
+		if (*host_idx == host) {
+			array_delete(&peer->hosts, array_foreach_idx(&peer->hosts, host_idx), 1);
+			if (array_count(&peer->hosts) == 0)
+				http_client_peer_free(&peer);
+			return;
+		}
+	}
+}
+
 struct http_client_request *
 http_client_peer_claim_request(struct http_client_peer *peer, bool no_urgent)
 {
@@ -294,6 +462,8 @@
 	array_foreach(&peer->hosts, host) {
 		http_client_host_connection_success(*host, &peer->addr);
 	}
+
+	http_client_peer_trigger_request_handler(peer);
 }
 
 void http_client_peer_connection_failure(struct http_client_peer *peer,
@@ -315,7 +485,8 @@
 	} else {
 		/* this was the only/last connection and connecting to it
 		   failed. a second connect will probably also fail, so just
-		   abort all requests. */
+		   try another IP for the hosts(s) or abort all requests if this
+		   was the only/last option. */
 		array_foreach(&peer->hosts, host) {
 			http_client_host_connection_failure(*host, &peer->addr, reason);
 		}
@@ -341,7 +512,7 @@
 
 	/* if there are pending requests for this peer, create a new connection
 	   for them. */
-	http_client_peer_handle_requests(peer);
+	http_client_peer_trigger_request_handler(peer);
 
 	if (array_count(&peer->conns) == 0 &&
 	    http_client_peer_requests_pending(peer, &num_urgent) == 0)
@@ -350,15 +521,23 @@
 
 unsigned int http_client_peer_idle_connections(struct http_client_peer *peer)
 {
-    struct http_client_connection *const *conn_idx;
-    unsigned int idle = 0;
+	struct http_client_connection *const *conn_idx;
+	unsigned int idle = 0;
 
 	/* find idle connections */
-    array_foreach(&peer->conns, conn_idx) {
-        if (http_client_connection_is_idle(*conn_idx))
+	array_foreach(&peer->conns, conn_idx) {
+		if (http_client_connection_is_idle(*conn_idx))
 			idle++;
-    }
+	}
 
 	return idle;
 }
 
+void http_client_peer_switch_ioloop(struct http_client_peer *peer)
+{
+	if (peer->to_req_handling != NULL) {
+		peer->to_req_handling =
+			io_loop_move_timeout(&peer->to_req_handling);
+	}
+}
+
--- a/src/lib-http/http-client-private.h	Sun Sep 15 03:29:03 2013 +0300
+++ b/src/lib-http/http-client-private.h	Sun Sep 15 03:31:28 2013 +0300
@@ -20,6 +20,7 @@
 
 ARRAY_DEFINE_TYPE(http_client_host, struct http_client_host *);
 ARRAY_DEFINE_TYPE(http_client_host_port, struct http_client_host_port);
+ARRAY_DEFINE_TYPE(http_client_peer, struct http_client_peer *);
 ARRAY_DEFINE_TYPE(http_client_connection, struct http_client_connection *);
 ARRAY_DEFINE_TYPE(http_client_request, struct http_client_request *);
 
@@ -28,6 +29,12 @@
 HASH_TABLE_DEFINE_TYPE(http_client_peer, const struct http_client_peer_addr *,
 	struct http_client_peer *);
 
+struct http_client_peer_addr {
+	char *https_name; /* TLS SNI */
+	struct ip_addr ip;
+	in_port_t port;
+};
+
 struct http_client_request {
 	pool_t pool;
 	unsigned int refcount;
@@ -35,7 +42,7 @@
 	struct http_client_request *prev, *next;
 
 	const char *method, *hostname, *target;
-	unsigned int port;
+	in_port_t port;
 
 	struct http_client *client;
 	struct http_client_host *host;
@@ -73,8 +80,7 @@
 struct http_client_host_port {
 	struct http_client_host *host;
 
-	unsigned int port;
-	char *https_name;
+	struct http_client_peer_addr addr;
 
 	/* current index in host->ips */
 	unsigned int ips_connect_idx;
@@ -82,8 +88,10 @@
 	   initially 0, and later set to the ip index of the last successful
 	   connected IP */
 	unsigned int ips_connect_start_idx;
-	/* number of connections trying to connect for this host+port */
-	unsigned int pending_connection_count;
+
+	/* peers we are trying to connect to;
+	   this can be more than one when soft connect timeouts are enabled */
+	ARRAY_TYPE(http_client_peer) pending_peers;
 
 	/* requests pending in queue to be picked up by connections */
 	ARRAY_TYPE(http_client_request) request_queue;
@@ -111,12 +119,6 @@
 	struct dns_lookup *dns_lookup;
 };
 
-struct http_client_peer_addr {
-	char *https_name; /* TLS SNI */
-	struct ip_addr ip;
-	unsigned int port;
-};
-
 struct http_client_peer {
 	struct http_client_peer_addr addr;
 	struct http_client *client;
@@ -128,10 +130,15 @@
 	/* active connections to this peer */
 	ARRAY_TYPE(http_client_connection) conns;
 
+	/* zero time-out for consolidating request handling */
+	struct timeout *to_req_handling;
+
 	unsigned int destroyed:1;        /* peer is being destroyed */
 	unsigned int no_payload_sync:1;  /* expect: 100-continue failed before */
 	unsigned int seen_100_response:1;/* expect: 100-continue succeeded before */
 	unsigned int last_connect_failed:1;
+	unsigned int allows_pipelining:1;/* peer is known to allow persistent
+	                                     connections */
 };
 
 struct http_client_connection {
@@ -227,7 +234,8 @@
 http_client_connection_count_pending(struct http_client_connection *conn);
 bool http_client_connection_is_ready(struct http_client_connection *conn);
 bool http_client_connection_is_idle(struct http_client_connection *conn);
-bool http_client_connection_next_request(struct http_client_connection *conn);
+int http_client_connection_next_request(struct http_client_connection *conn);
+void http_client_connection_check_idle(struct http_client_connection *conn);
 void http_client_connection_switch_ioloop(struct http_client_connection *conn);
 
 unsigned int http_client_peer_addr_hash
@@ -244,15 +252,19 @@
 				struct http_client_host *host);
 void http_client_peer_add_host(struct http_client_peer *peer,
 	struct http_client_host *host);
+void http_client_peer_remove_host(struct http_client_peer *peer,
+				struct http_client_host *host);
 struct http_client_request *
 	http_client_peer_claim_request(struct http_client_peer *peer,
 		bool no_urgent);
-bool http_client_peer_handle_requests(struct http_client_peer *peer);
+void http_client_peer_trigger_request_handler(struct http_client_peer *peer);
 void http_client_peer_connection_success(struct http_client_peer *peer);
 void http_client_peer_connection_failure(struct http_client_peer *peer,
 					 const char *reason);
 void http_client_peer_connection_lost(struct http_client_peer *peer);
+bool http_client_peer_is_connected(struct http_client_peer *peer);
 unsigned int http_client_peer_idle_connections(struct http_client_peer *peer);
+void http_client_peer_switch_ioloop(struct http_client_peer *peer);
 
 struct http_client_host *
 	http_client_host_get(struct http_client *client, const char *hostname);
--- a/src/lib-http/http-client-request.c	Sun Sep 15 03:29:03 2013 +0300
+++ b/src/lib-http/http-client-request.c	Sun Sep 15 03:31:28 2013 +0300
@@ -119,7 +119,7 @@
 }
 
 void http_client_request_set_port(struct http_client_request *req,
-	unsigned int port)
+	in_port_t port)
 {
 	i_assert(req->state == HTTP_REQUEST_STATE_NEW);
 	req->port = port;
--- a/src/lib-http/http-client.c	Sun Sep 15 03:29:03 2013 +0300
+++ b/src/lib-http/http-client.c	Sun Sep 15 03:31:28 2013 +0300
@@ -142,6 +142,7 @@
 {
 	struct connection *_conn = client->conn_list->connections;
 	struct http_client_host *host;
+	struct http_client_peer *peer;
 
 	/* move connections */
 	/* FIXME: we wouldn't necessarily need to switch all of them
@@ -154,6 +155,10 @@
 		http_client_connection_switch_ioloop(conn);
 	}
 
+	/* move peers */
+	for (peer = client->peers_list; peer != NULL; peer = peer->next)
+		http_client_peer_switch_ioloop(peer);
+
 	/* move dns lookups and delayed requests */
 	for (host = client->hosts_list; host != NULL; host = host->next)
 		http_client_host_switch_ioloop(host);
--- a/src/lib-http/http-client.h	Sun Sep 15 03:29:03 2013 +0300
+++ b/src/lib-http/http-client.h	Sun Sep 15 03:31:28 2013 +0300
@@ -1,6 +1,8 @@
 #ifndef HTTP_CLIENT_H
 #define HTTP_CLIENT_H
 
+#include "net.h"
+
 #include "http-response.h"
 
 struct http_response;
@@ -89,7 +91,7 @@
 		(http_client_request_callback_t *)callback, context)
 
 void http_client_request_set_port(struct http_client_request *req,
-	unsigned int port);
+	in_port_t port);
 void http_client_request_set_ssl(struct http_client_request *req,
 	bool ssl);
 void http_client_request_set_urgent(struct http_client_request *req);