changeset 21044:66c79e8a1132

lib-http: client: Link peers to queue earlier: during connection attempts. This makes sure that queues keep track of which peers are doing stuff on its behalf. This is important to be able to manage the active peers when a new host name lookup was performed; if a peer is no longer listed in the returned IPs it should be dropped.
author Stephan Bosch <stephan@dovecot.fi>
date Fri, 16 Sep 2016 01:35:09 +0200
parents a56f60c2d992
children c926a09729f0
files src/lib-http/http-client-private.h src/lib-http/http-client-queue.c
diffstat 2 files changed, 127 insertions(+), 32 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib-http/http-client-private.h	Thu Sep 15 02:14:46 2016 +0200
+++ b/src/lib-http/http-client-private.h	Fri Sep 16 01:35:09 2016 +0200
@@ -236,6 +236,9 @@
 	   this can be more than one when soft connect timeouts are enabled */
 	ARRAY_TYPE(http_client_peer) pending_peers;
 
+	/* currently active peer */
+	struct http_client_peer *cur_peer;
+
 	/* all requests associated to this queue
 	   (ordered by earliest timeout first) */
 	ARRAY_TYPE(http_client_request) requests; 
--- a/src/lib-http/http-client-queue.c	Thu Sep 15 02:14:46 2016 +0200
+++ b/src/lib-http/http-client-queue.c	Fri Sep 16 01:35:09 2016 +0200
@@ -122,18 +122,41 @@
 
 void http_client_queue_free(struct http_client_queue *queue)
 {
+	struct http_client_peer *const *peer_idx;
+	ARRAY_TYPE(http_client_peer) peers;
+
+	http_client_queue_debug(queue, "Destroy");
+
+	/* unlink all peers */
+	if (queue->cur_peer != NULL) {
+		struct http_client_peer *peer = queue->cur_peer;
+		queue->cur_peer = NULL;
+		http_client_peer_unlink_queue(peer, queue);
+	}
+	if (array_is_created(&queue->pending_peers)) {
+		t_array_init(&peers, array_count(&queue->pending_peers));
+		array_copy(&peers.arr, 0, &queue->pending_peers.arr, 0,
+			array_count(&queue->pending_peers));
+		array_foreach(&peers, peer_idx)
+			http_client_peer_unlink_queue(*peer_idx, queue);
+		array_free(&queue->pending_peers);
+	}
+
+	/* abort all requests */
 	http_client_queue_fail
 		(queue, HTTP_CLIENT_REQUEST_ERROR_ABORTED, "Aborted");
-	if (array_is_created(&queue->pending_peers))
-		array_free(&queue->pending_peers);
 	array_free(&queue->requests);
 	array_free(&queue->queued_requests);
 	array_free(&queue->queued_urgent_requests);
 	array_free(&queue->delayed_requests);
+
+	/* cancel timeouts */
 	if (queue->to_connect != NULL)
 		timeout_remove(&queue->to_connect);
 	if (queue->to_delayed != NULL)
 		timeout_remove(&queue->to_delayed);
+
+	/* free */
 	i_free(queue->addr_name);
 	i_free(queue->name);
 	i_free(queue);
@@ -226,18 +249,19 @@
 	http_client_queue_connection_setup(queue);
 }
 
-void http_client_queue_connection_setup(struct http_client_queue *queue)
+static struct http_client_peer *
+http_client_queue_connection_attempt(struct http_client_queue *queue)
 {
 	struct http_client_host *host = queue->host;
-	struct http_client_peer *peer = NULL;
-	const struct http_client_peer_addr *addr = &queue->addr;
+	struct http_client_peer *peer;
+	struct http_client_peer_addr *addr = &queue->addr;
 	unsigned int num_requests =
 		array_count(&queue->queued_requests) +
 		array_count(&queue->queued_urgent_requests);
 	const char *ssl = "";
 
 	if (num_requests == 0)
-		return;
+		return NULL;
 
 	/* update our peer address */
 	if (queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX) {
@@ -247,31 +271,77 @@
 		ssl = (ssl == NULL ? "" : t_strdup_printf(" (SSL=%s)", ssl));
 	}
 
+	/* already got a peer? */
+	peer = NULL;
+	if (queue->cur_peer != NULL) {
+		i_assert(!array_is_created(&queue->pending_peers) ||
+			array_count(&queue->pending_peers) == 0);
+
+		/* is it still the one we want? */
+		if (http_client_peer_addr_cmp(addr, &queue->cur_peer->addr) == 0) {
+			/* is it still connected? */
+			if (http_client_peer_is_connected(queue->cur_peer)) {
+				/* yes */
+				http_client_queue_debug(queue,
+					"Using existing connection to %s%s "
+					"(%u requests pending)",
+					http_client_peer_addr2str(addr), ssl, num_requests);
+
+				/* handle requests; */
+				http_client_peer_trigger_request_handler(queue->cur_peer);
+				return queue->cur_peer;
+			}
+			/* no */
+			peer = queue->cur_peer;
+		} else {
+			/* peer is not relevant to this queue anymore */
+			http_client_peer_unlink_queue(queue->cur_peer, queue);
+		}
+
+		queue->cur_peer = NULL;
+	}
+
+	if (peer == NULL)
+		peer = http_client_peer_get(queue->client, addr);
+
 	http_client_queue_debug(queue, "Setting up connection to %s%s "
 		"(%u requests pending)", http_client_peer_addr2str(addr), ssl,
 		num_requests);
 
-
-	/* create/get peer */
-	peer = http_client_peer_get(queue->client, addr);
+	/* create provisional link between queue and peer */
 	http_client_peer_link_queue(peer, queue);
 
 	/* handle requests; creates new connections when needed/possible */
 	http_client_peer_trigger_request_handler(peer);
 
-	if (!http_client_peer_is_connected(peer)) {
+	if (http_client_peer_is_connected(peer)) {
+		/* drop any pending peers */
+		if (array_is_created(&queue->pending_peers) &&
+			array_count(&queue->pending_peers) > 0) {
+			struct http_client_peer *const *peer_idx;
+
+			array_foreach(&queue->pending_peers, peer_idx) {
+				i_assert(http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) != 0);
+				http_client_peer_unlink_queue(*peer_idx, queue);
+			}
+			array_clear(&queue->pending_peers);
+		}
+		queue->cur_peer = peer;
+
+	} else {
 		unsigned int msecs;
 		bool new_peer = TRUE;
 
 		/* not already connected, wait for connections */
-		if (!array_is_created(&queue->pending_peers))
+		if (!array_is_created(&queue->pending_peers)) {
 			i_array_init(&queue->pending_peers, 8);
-		else {
+		} else {
 			struct http_client_peer *const *peer_idx;
 
 			/* we may be waiting for this peer already */
 			array_foreach(&queue->pending_peers, peer_idx) {
 				if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) {
+					i_assert(*peer_idx == peer);
 					new_peer = FALSE;
 					break;
 				}
@@ -296,6 +366,13 @@
 			}
 		}
 	}
+
+	return peer;
+}
+
+void http_client_queue_connection_setup(struct http_client_queue *queue)
+{
+	(void)http_client_queue_connection_attempt(queue);
 }
 
 void
@@ -328,6 +405,8 @@
 				   connected peer, even if some of the connections
 				   are pending. they may be intended for urgent
 				   requests. */
+				i_assert(queue->cur_peer == NULL);
+				queue->cur_peer = *peer_idx;
 				continue;
 			}
 			/* unlink this queue from the peer; if this was the last/only queue, the
@@ -336,6 +415,7 @@
 			http_client_peer_unlink_queue(*peer_idx, queue);
 		}
 		array_clear(&queue->pending_peers);
+		i_assert(queue->cur_peer != NULL);
 	}
 }
 
@@ -347,6 +427,12 @@
 		&queue->client->set;
 	const char *https_name = http_client_peer_addr_get_https_name(addr);
 	struct http_client_host *host = queue->host;
+	struct http_client_peer *failed_peer;
+	struct http_client_peer *const *peer_idx;
+
+	i_assert(queue->cur_peer == NULL);
+	i_assert(array_is_created(&queue->pending_peers) &&
+		array_count(&queue->pending_peers) > 0);
 
 	http_client_queue_debug(queue,
 		"Failed to set up connection to %s%s: %s "
@@ -358,26 +444,25 @@
 		 	array_count(&queue->pending_peers): 0),
 		array_count(&queue->requests));
 
-	if (array_is_created(&queue->pending_peers) &&
-		array_count(&queue->pending_peers) > 0) {
-		struct http_client_peer *const *peer_idx;
-
-		/* we're still doing the initial connections to this hport. if
-		   we're also doing parallel connections with soft timeouts
-		   (pending_peer_count>1), wait for them to finish
-		   first. */
-		array_foreach(&queue->pending_peers, peer_idx) {
-			if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) {
-				array_delete(&queue->pending_peers,
-					array_foreach_idx(&queue->pending_peers, peer_idx), 1);
-				break;
-			}
+	/* we're still doing the initial connections to this hport. if
+		 we're also doing parallel connections with soft timeouts
+		 (pending_peer_count>1), wait for them to finish
+		 first. */
+	failed_peer = NULL;
+	array_foreach(&queue->pending_peers, peer_idx) {
+		if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) {
+			failed_peer = *peer_idx;
+			array_delete(&queue->pending_peers,
+				array_foreach_idx(&queue->pending_peers, peer_idx), 1);
+			break;
 		}
-		if (array_count(&queue->pending_peers) > 0) {
-			http_client_queue_debug(queue,
-				"Waiting for remaining pending peers.");
-			return;
-		}
+	}
+	i_assert(failed_peer != NULL);
+	if (array_count(&queue->pending_peers) > 0) {
+		http_client_queue_debug(queue,
+			"Waiting for remaining pending peers.");
+		http_client_peer_unlink_queue(failed_peer, queue);
+		return;
 	}
 
 	/* one of the connections failed. if we're not using soft timeouts,
@@ -410,6 +495,7 @@
 					total_msecs/1000, total_msecs%1000);
 			}
 			queue->connect_attempts = 0;
+			http_client_peer_unlink_queue(failed_peer, queue);
 			http_client_queue_fail(queue,
 				HTTP_CLIENT_REQUEST_ERROR_CONNECT_FAILED, reason);
 			return;
@@ -418,7 +504,8 @@
 		queue->ips_connect_idx = (queue->ips_connect_idx + 1) % host->ips_count;
 	}
 	
-	http_client_queue_connection_setup(queue);
+	if (http_client_queue_connection_attempt(queue) != failed_peer)
+		http_client_peer_unlink_queue(failed_peer, queue);
 	return;
 }
 
@@ -428,6 +515,11 @@
 {
 	struct http_client_peer *const *peer_idx;
 
+	if (queue->cur_peer == peer) {
+		queue->cur_peer = NULL;
+		return;
+	}
+
 	if (!array_is_created(&queue->pending_peers))
 		return;