diff src/director/director-connection.c @ 14409:b43ae3805f5f

director: Redesigned connection handling and error handling. Director now accepts all connections from everywhere and syncs them until the handshaking is finished. At that point it finally decides if this is a connection that should be used as our left/right connection, or if it should be disconnected. This should make connecting more reliable, especially if one of the directors sends broken handshake or has other trouble.
author Timo Sirainen <tss@iki.fi>
date Tue, 03 Apr 2012 05:58:29 +0300
parents 544a8c4705e5
children 084064444f89
line wrap: on
line diff
--- a/src/director/director-connection.c	Tue Apr 03 00:50:12 2012 +0300
+++ b/src/director/director-connection.c	Tue Apr 03 05:58:29 2012 +0300
@@ -7,8 +7,10 @@
 
    VERSION
    ME
-   <wait for remote handshake>
+   <wait for DONE from remote handshake>
    DONE
+   <make this connection our "left" connection, potentially disconnecting
+   another one>
 
    Outgoing director connections send:
 
@@ -21,6 +23,9 @@
    [0..n] USER
    <possibly other non-handshake commands between USERs>
    DONE
+   <wait for DONE from remote>
+   <make this connection our "right" connection, potentially disconnecting
+   another one>
 */
 
 #include "lib.h"
@@ -44,10 +49,16 @@
 #define MAX_INBUF_SIZE 1024
 #define MAX_OUTBUF_SIZE (1024*1024*10)
 #define OUTBUF_FLUSH_THRESHOLD (1024*128)
-/* Max idling time while connecting/handshaking before disconnecting */
-#define DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS (10*1000)
-/* How long to wait for PONG after PING request */
-#define DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS (10*1000)
+/* Max idling time before "ME" command must have been received,
+   or we'll disconnect. */
+#define DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS (2*1000)
+/* Max idling time before "DONE" command must have been received,
+   or we'll disconnect. */
+#define DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS (30*1000)
+/* How long to wait for PONG for an idling connection */
+#define DIRECTOR_CONNECTION_PING_IDLE_TIMEOUT_MSECS (2*1000)
+/* Maximum time to wait for PONG reply */
+#define DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS (60*1000)
 /* How long to wait to send PING when connection is idle */
 #define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000)
 /* How long to wait before sending PING while waiting for SYNC reply */
@@ -56,6 +67,14 @@
    mark the host as failed so we won't try to reconnect to it immediately */
 #define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 10
 
+#if DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS <= DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS
+#  error DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS is too low
+#endif
+
+#if DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS <= DIRECTOR_CONNECTION_PING_IDLE_TIMEOUT_MSECS
+#  error DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS is too low
+#endif
+
 #define CMD_IS_USER_HANDHAKE(args) \
 	(str_array_length(args) > 2)
 
@@ -72,7 +91,7 @@
 	struct io *io;
 	struct istream *input;
 	struct ostream *output;
-	struct timeout *to, *to_ping;
+	struct timeout *to, *to_ping, *to_pong;
 
 	struct user_directory_iter *user_iter;
 
@@ -88,9 +107,10 @@
 	unsigned int handshake_sending_hosts:1;
 	unsigned int ping_waiting:1;
 	unsigned int synced:1;
+	unsigned int wrong_host:1;
+	unsigned int verifying_left:1;
 };
 
-static void director_connection_ping(struct director_connection *conn);
 static void director_connection_disconnected(struct director_connection **conn);
 
 static void ATTR_FORMAT(2, 3)
@@ -104,6 +124,172 @@
 	va_end(args);
 }
 
+static void
+director_connection_init_timeout(struct director_connection *conn)
+{
+	unsigned int secs = ioloop_time - conn->created;
+
+	if (!conn->connected) {
+		i_error("director(%s): Connect timed out (%u secs)",
+			conn->name, secs);
+	} else if (!conn->me_received) {
+		i_error("director(%s): Handshaking ME timed out (%u secs)",
+			conn->name, secs);
+	} else {
+		i_error("director(%s): Handshaking DONE timed out (%u secs)",
+			conn->name, secs);
+	}
+	director_connection_disconnected(&conn);
+}
+
+static void
+director_connection_set_ping_timeout(struct director_connection *conn)
+{
+	unsigned int msecs;
+
+	msecs = conn->synced || !conn->handshake_received ?
+		DIRECTOR_CONNECTION_PING_INTERVAL_MSECS :
+		DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS;
+
+	timeout_remove(&conn->to_ping);
+	conn->to_ping = timeout_add(msecs, director_connection_ping, conn);
+}
+
+static void director_connection_send_connect(struct director_connection *conn,
+					     struct director_host *host)
+{
+	const char *connect_str;
+
+	connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
+				      net_ip2addr(&host->ip), host->port);
+	director_connection_send(conn, connect_str);
+	(void)o_stream_flush(conn->output);
+	o_stream_uncork(conn->output);
+}
+
+static void director_connection_assigned(struct director_connection *conn)
+{
+	struct director *dir = conn->dir;
+
+	if (dir->left != NULL && dir->right != NULL) {
+		/* we're connected to both directors. see if the ring is
+		   finished by sending a SYNC. if we get it back, it's done. */
+		dir->sync_seq++;
+		director_set_ring_unsynced(dir);
+		director_sync_send(dir, dir->self_host, dir->sync_seq,
+				   DIRECTOR_VERSION_MINOR);
+	}
+	director_connection_set_ping_timeout(conn);
+}
+
+static bool director_connection_assign_left(struct director_connection *conn)
+{
+	struct director *dir = conn->dir;
+
+	i_assert(conn->in);
+	i_assert(dir->left != conn);
+
+	/* make sure this is the correct incoming connection */
+	if (conn->host->self) {
+		i_error("Connection from self, dropping");
+		return FALSE;
+	} else if (dir->left == NULL) {
+		/* no conflicts yet */
+	} else if (dir->left->host == conn->host) {
+		i_info("Dropping existing connection %s "
+		       "in favor of its new connection %s",
+		       dir->left->host->name, conn->host->name);
+		director_connection_deinit(&dir->left);
+	} else if (dir->left->verifying_left) {
+		/* we're waiting to verify if our current left is still
+		   working. if we don't receive a PONG, the current left
+		   gets disconnected and a new left gets assigned. if we do
+		   receive a PONG, we'll wait until the current left
+		   disconnects us and then reassign the new left. */
+		return TRUE;
+	} else if (director_host_cmp_to_self(dir->left->host, conn->host,
+					     dir->self_host) < 0) {
+		/* the old connection is the correct one.
+		   refer the client there (FIXME: do we ever get here?) */
+		i_warning("Director connection %s tried to connect to "
+			  "us, should use %s instead",
+			  conn->name, dir->left->host->name);
+		director_connection_send_connect(conn, dir->left->host);
+		return FALSE;
+	} else {
+		/* this new connection is the correct one, but wait until the
+		   old connection gets disconnected before using this one.
+		   that guarantees that the director inserting itself into
+		   the ring has finished handshaking its left side, so the
+		   switch will be fast. */
+		return TRUE;
+	}
+	dir->left = conn;
+	i_free(conn->name);
+	conn->name = i_strdup_printf("%s/left", conn->host->name);
+	director_connection_assigned(conn);
+	return TRUE;
+}
+
+static void director_assign_left(struct director *dir)
+{
+	struct director_connection *conn, *const *connp;
+
+	array_foreach(&dir->connections, connp) {
+		conn = *connp;
+
+		if (conn->in && conn->handshake_received && conn != dir->left) {
+			/* either use this or disconnect it */
+			if (!director_connection_assign_left(conn)) {
+				/* we don't want this */
+				director_connection_deinit(&dir->left);
+				director_assign_left(dir);
+				break;
+			}
+		}
+	}
+}
+
+static bool director_has_outgoing_connections(struct director *dir)
+{
+	struct director_connection *const *connp;
+
+	array_foreach(&dir->connections, connp) {
+		if (!(*connp)->in)
+			return TRUE;
+	}
+	return FALSE;
+}
+
+static bool director_connection_assign_right(struct director_connection *conn)
+{
+	struct director *dir = conn->dir;
+
+	i_assert(!conn->in);
+
+	if (dir->right != NULL) {
+		/* see if we should disconnect or keep the existing
+		   connection. */
+		if (director_host_cmp_to_self(conn->host, dir->right->host,
+					      dir->self_host) <= 0) {
+			/* the old connection is the correct one */
+			i_warning("Aborting incorrect outgoing connection to %s "
+				  "(already connected to correct one: %s)",
+				  conn->host->name, dir->right->host->name);
+			conn->wrong_host = TRUE;
+			return FALSE;
+		}
+		i_info("Replacing right director connection %s with %s",
+		       dir->right->host->name, conn->host->name);
+		director_connection_deinit(&dir->right);
+	}
+	dir->right = conn;
+	i_free(conn->name);
+	conn->name = i_strdup_printf("%s/right", conn->host->name);
+	director_connection_assigned(conn);
+	return TRUE;
+}
+
 static bool
 director_args_parse_ip_port(struct director_connection *conn,
 			    const char *const *args,
@@ -128,7 +314,6 @@
 			    const char *const *args)
 {
 	struct director *dir = conn->dir;
-	struct director_host *host;
 	const char *connect_str;
 	struct ip_addr ip;
 	unsigned int port;
@@ -148,75 +333,55 @@
 			net_ip2addr(&ip), port);
 		return FALSE;
 	}
-	host = director_host_get(dir, &ip, port);
-	/* the host is up now, make sure we can connect to it immediately
-	   if needed */
-	host->last_failed = 0;
 	conn->me_received = TRUE;
 
+	timeout_remove(&conn->to_ping);
+	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS,
+				    director_connection_init_timeout, conn);
+
 	if (!conn->in)
 		return TRUE;
 
-	i_free(conn->name);
-	conn->name = i_strdup_printf("%s/left", host->name);
-	conn->host = host;
-	/* make sure we don't keep old sequence values across restarts */
-	host->last_seq = 0;
+	/* Incoming connection:
+
+	   a) we don't have an established ring yet. make sure we're connecting
+	   to our right side (which might become our left side).
+
+	   b) it's our current "left" connection. the previous connection
+	   is most likely dead.
+
+	   c) we have an existing ring. tell our current "left" to connect to
+	   it with CONNECT command.
 
-	connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
-				      net_ip2addr(&host->ip), host->port);
-	/* make sure this is the correct incoming connection */
-	if (host->self) {
-		/* probably we're trying to find our own ip. it's no */
-		i_error("Connection from self, dropping");
-		return FALSE;
-	} else if (dir->left == NULL) {
-		/* no conflicts yet */
-	} else if (dir->left->host == host) {
-		i_warning("Dropping existing connection %s "
-			  "in favor of its new connection %s",
-			  dir->left->host->name, host->name);
+	   d) the incoming connection doesn't belong to us at all, refer it
+	   elsewhere with CONNECT. however, before disconnecting it verify
+	   first that our left side is actually still functional.
+	*/
+	conn->host = director_host_get(dir, &ip, port);
+	/* make sure we don't keep old sequence values across restarts */
+	conn->host->last_seq = 0;
+
+	if (dir->left == NULL) {
+		/* a) - just in case the left is also our right side reset
+		   its failed state, so we can connect to it */
+		conn->host->last_failed = 0;
+		if (!director_has_outgoing_connections(dir))
+			director_connect(dir);
+	} else if (dir->left->host == conn->host) {
+		/* b) */
+		i_assert(dir->left != conn);
 		director_connection_deinit(&dir->left);
+	} else if (director_host_cmp_to_self(conn->host, dir->left->host,
+					     dir->self_host) < 0) {
+		/* c) */
+		connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
+					      net_ip2addr(&conn->host->ip),
+					      conn->host->port);
+		director_connection_send(dir->left, connect_str);
 	} else {
-		if (director_host_cmp_to_self(dir->left->host, host,
-					      dir->self_host) < 0) {
-			/* the old connection is the correct one.
-			   refer the client there. */
-			i_warning("Director connection %s tried to connect to "
-				  "us, should use %s instead",
-				  host->name, dir->left->host->name);
-			director_connection_send(conn, t_strdup_printf(
-				"CONNECT\t%s\t%u\n",
-				net_ip2addr(&dir->left->host->ip),
-				dir->left->host->port));
-			/* also make sure that the connection is alive */
-			director_connection_ping(dir->left);
-			return FALSE;
-		}
-
-		/* this new connection is the correct one. disconnect the old
-		   one, but before that tell it to connect to the new one.
-		   that message might not reach it, so also send the same
-		   message to right side. */
-		i_warning("Replacing director connection %s with %s",
-			  dir->left->host->name, host->name);
-		director_connection_send(dir->left, connect_str);
-		(void)o_stream_flush(dir->left->output);
-		director_connection_deinit(&dir->left);
-	}
-	dir->left = conn;
-
-	/* tell the ring's right side to connect to this new director. */
-	if (dir->right != NULL) {
-		if (dir->left->host != dir->right->host)
-			director_connection_send(dir->right, connect_str);
-		else {
-			/* there are only two directors, and we already have
-			   a connection to this server. */
-		}
-	} else {
-		/* there are only two directors. connect to the other one. */
-		(void)director_connect_host(dir, host);
+		/* d) */
+		dir->left->verifying_left = TRUE;
+		director_connection_ping(dir->left);
 	}
 	return TRUE;
 }
@@ -390,7 +555,7 @@
 
 	/* save the director and forward it */
 	director_host_add(conn->dir, &ip, port);
-	director_connection_send(conn->dir->right,
+	director_update_send(conn->dir, director_connection_get_host(conn),
 		t_strdup_printf("DIRECTOR\t%s\t%u\n", net_ip2addr(&ip), port));
 	return TRUE;
 }
@@ -678,62 +843,40 @@
 	return TRUE;
 }
 
-static void
-director_connection_set_ping_timeout(struct director_connection *conn)
-{
-	unsigned int msecs;
-
-	msecs = conn->synced || !conn->handshake_received ?
-		DIRECTOR_CONNECTION_PING_INTERVAL_MSECS :
-		DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS;
-
-	timeout_remove(&conn->to_ping);
-	conn->to_ping = timeout_add(msecs, director_connection_ping, conn);
-}
-
-static void director_handshake_cmd_done(struct director_connection *conn)
+static bool director_handshake_cmd_done(struct director_connection *conn)
 {
 	struct director *dir = conn->dir;
 
 	if (dir->debug)
 		i_debug("Handshaked to %s", conn->host->name);
 
+	/* the host is up now, make sure we can connect to it immediately
+	   if needed */
 	conn->host->last_failed = 0;
+
 	conn->handshake_received = TRUE;
 	if (conn->in) {
 		/* handshaked to left side. tell it we've received the
 		   whole handshake. */
 		director_connection_send(conn, "DONE\n");
 
-		/* tell the right director about the left one */
-		if (dir->right != NULL) {
-			director_connection_send(dir->right,
-				t_strdup_printf("DIRECTOR\t%s\t%u\n",
-						net_ip2addr(&conn->host->ip),
-						conn->host->port));
-		}
+		/* tell the "right" director about the "left" one */
+		director_update_send(dir, director_connection_get_host(conn),
+			t_strdup_printf("DIRECTOR\t%s\t%u\n",
+					net_ip2addr(&conn->host->ip),
+					conn->host->port));
+		/* this is our "left" side. */
+		return director_connection_assign_left(conn);
+	} else {
+		/* handshaked to "right" side. */
+		return director_connection_assign_right(conn);
 	}
-
-	if (dir->left != NULL && dir->right != NULL &&
-	    dir->left->handshake_received && dir->right->handshake_received) {
-		/* we're connected to both directors. see if the ring is
-		   finished by sending a SYNC. if we get it back, it's done. */
-		dir->sync_seq++;
-		director_set_ring_unsynced(dir);
-		director_sync_send(dir, dir->self_host, dir->sync_seq,
-				   DIRECTOR_VERSION_MINOR);
-	}
-	director_connection_set_ping_timeout(conn);
 }
 
 static int
 director_connection_handle_handshake(struct director_connection *conn,
 				     const char *cmd, const char *const *args)
 {
-	struct director_host *host;
-	struct ip_addr ip;
-	unsigned int port;
-
 	/* both incoming and outgoing connections get VERSION and ME */
 	if (strcmp(cmd, "VERSION") == 0 && str_array_length(args) >= 3) {
 		if (strcmp(args[0], DIRECTOR_VERSION_NAME) != 0) {
@@ -757,28 +900,6 @@
 
 	if (strcmp(cmd, "ME") == 0)
 		return director_cmd_me(conn, args) ? 1 : -1;
-
-	if (strcmp(cmd, "CONNECT") == 0) {
-		/* remote wants us to connect elsewhere */
-		if (!director_args_parse_ip_port(conn, args, &ip, &port))
-			return -1;
-		if (conn->in) {
-			director_cmd_error(conn,
-				"Incoming connections can't request CONNECT");
-			return -1;
-		}
-
-		conn->dir->right = NULL;
-		host = director_host_get(conn->dir, &ip, port);
-		/* reset failure timestamp so we'll actually try to
-		   connect there. */
-		host->last_failed = 0;
-		if (conn->dir->debug)
-			i_debug("Received CONNECT reference to %s", host->name);
-		(void)director_connect_host(conn->dir, host);
-		return 1;
-	}
-	/* Only VERSION and CONNECT commands are allowed before ME */
 	if (!conn->me_received) {
 		director_cmd_error(conn, "Expecting ME command first");
 		return -1;
@@ -809,10 +930,8 @@
 		return director_handshake_cmd_user(conn, args) ? 1 : -1;
 
 	/* both get DONE */
-	if (strcmp(cmd, "DONE") == 0) {
-		director_handshake_cmd_done(conn);
-		return 1;
-	}
+	if (strcmp(cmd, "DONE") == 0)
+		return director_handshake_cmd_done(conn) ? 1 : -1;
 	return 0;
 }
 
@@ -901,12 +1020,9 @@
 		return FALSE;
 	}
 
-	host = director_host_lookup(dir, &ip, port);
-	if (host == NULL) {
-		i_error("Received CONNECT request to unknown host %s:%u",
-			net_ip2addr(&ip), port);
-		return TRUE;
-	}
+	host = director_host_get(conn->dir, &ip, port);
+	/* reset failure timestamp so we'll actually try to connect there. */
+	host->last_failed = 0;
 
 	/* remote suggests us to connect elsewhere */
 	if (dir->right != NULL &&
@@ -937,11 +1053,42 @@
 	return TRUE;
 }
 
+static void director_disconnect_wrong_lefts(struct director *dir)
+{
+	struct director_connection *const *connp, *conn;
+
+	array_foreach(&dir->connections, connp) {
+		conn = *connp;
+
+		if (conn->in && conn != dir->left && conn->me_received &&
+		    director_host_cmp_to_self(dir->left->host, conn->host,
+					      dir->self_host) < 0) {
+			i_warning("Director connection %s tried to connect to "
+				  "us, should use %s instead",
+				  conn->name, dir->left->host->name);
+			director_connection_send_connect(conn, dir->left->host);
+			director_connection_deinit(&conn);
+			director_disconnect_wrong_lefts(dir);
+			return;
+		}
+	}
+}
+
 static bool director_cmd_pong(struct director_connection *conn)
 {
 	if (!conn->ping_waiting)
 		return TRUE;
 	conn->ping_waiting = FALSE;
+	timeout_remove(&conn->to_pong);
+
+	if (conn->verifying_left) {
+		conn->verifying_left = FALSE;
+		if (conn == conn->dir->left) {
+			/* our left side is functional. tell all the wrong
+			   incoming connections to connect to it instead. */
+			director_disconnect_wrong_lefts(conn->dir);
+		}
+	}
 
 	director_connection_set_ping_timeout(conn);
 	return TRUE;
@@ -953,14 +1100,6 @@
 {
 	int ret;
 
-	/* ping/pong is always handled */
-	if (strcmp(cmd, "PING") == 0) {
-		director_connection_send(conn, "PONG\n");
-		return TRUE;
-	}
-	if (strcmp(cmd, "PONG") == 0)
-		return director_cmd_pong(conn);
-
 	if (!conn->handshake_received) {
 		ret = director_connection_handle_handshake(conn, cmd, args);
 		if (ret > 0)
@@ -972,13 +1111,17 @@
 				i_debug("director(%s): Handshaking failed",
 					conn->host->name);
 			}
-			if (conn->host != NULL)
-				conn->host->last_failed = ioloop_time;
 			return FALSE;
 		}
 		/* allow also other commands during handshake */
 	}
 
+	if (strcmp(cmd, "PING") == 0) {
+		director_connection_send(conn, "PONG\n");
+		return TRUE;
+	}
+	if (strcmp(cmd, "PONG") == 0)
+		return director_cmd_pong(conn);
 	if (strcmp(cmd, "USER") == 0)
 		return director_cmd_user(conn, args);
 	if (strcmp(cmd, "USER-WEAK") == 0)
@@ -1101,7 +1244,6 @@
 	struct user *user;
 	int ret;
 
-	o_stream_cork(conn->output);
 	while ((user = user_directory_iter_next(conn->user_iter)) != NULL) {
 		T_BEGIN {
 			string_t *str = t_str_new(128);
@@ -1131,37 +1273,26 @@
 	conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
 
 	ret = o_stream_flush(conn->output);
-	o_stream_uncork(conn->output);
 	timeout_reset(conn->to_ping);
 	return ret;
 }
 
 static int director_connection_output(struct director_connection *conn)
 {
+	int ret;
+
 	if (conn->user_iter != NULL) {
 		/* still handshaking USER list */
-		return director_connection_send_users(conn);
+		o_stream_cork(conn->output);
+		ret = director_connection_send_users(conn);
+		o_stream_uncork(conn->output);
+		if (ret < 0)
+			director_connection_disconnected(&conn);
+		return ret;
 	}
 	return o_stream_flush(conn->output);
 }
 
-static void
-director_connection_init_timeout(struct director_connection *conn)
-{
-	unsigned int secs = ioloop_time - conn->created;
-
-	if (conn->host != NULL)
-		conn->host->last_failed = ioloop_time;
-	if (!conn->connected) {
-		i_error("director(%s): Connect timed out (%u secs)",
-			conn->name, secs);
-	} else {
-		i_error("director(%s): Handshaking timed out (%u secs)",
-			conn->name, secs);
-	}
-	director_connection_disconnected(&conn);
-}
-
 static struct director_connection *
 director_connection_init_common(struct director *dir, int fd)
 {
@@ -1173,9 +1304,7 @@
 	conn->dir = dir;
 	conn->input = i_stream_create_fd(conn->fd, MAX_INBUF_SIZE, FALSE);
 	conn->output = o_stream_create_fd(conn->fd, MAX_OUTBUF_SIZE, FALSE);
-	o_stream_set_flush_callback(conn->output,
-				    director_connection_output, conn);
-	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS,
+	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS,
 				    director_connection_init_timeout, conn);
 	array_append(&dir->connections, &conn, 1);
 	return conn;
@@ -1213,36 +1342,18 @@
 	int err;
 
 	if ((err = net_geterror(conn->fd)) != 0) {
-		conn->host->last_failed = ioloop_time;
 		i_error("director(%s): connect() failed: %s", conn->name,
 			strerror(err));
 		director_connection_disconnected(&conn);
 		return;
 	}
 	conn->connected = TRUE;
-
-	if (dir->right != NULL) {
-		/* see if we should disconnect or keep the existing
-		   connection. */
-		if (director_host_cmp_to_self(conn->host, dir->right->host,
-					      dir->self_host) <= 0) {
-			/* the old connection is the correct one */
-			i_warning("Aborting incorrect outgoing connection to %s "
-				  "(already connected to correct one: %s)",
-				  conn->host->name, dir->right->host->name);
-			director_connection_deinit(&conn);
-			return;
-		}
-		i_warning("Replacing director connection %s with %s",
-			  dir->right->host->name, conn->host->name);
-		director_connection_deinit(&dir->right);
-	}
-	dir->right = conn;
-	i_free(conn->name);
-	conn->name = i_strdup_printf("%s/right", conn->host->name);
+	o_stream_set_flush_callback(conn->output,
+				    director_connection_output, conn);
 
 	io_remove(&conn->io);
 
+	o_stream_cork(conn->output);
 	director_connection_send_handshake(conn);
 	director_connection_send_directors(conn, str);
 	director_connection_send_hosts(conn, str);
@@ -1250,6 +1361,7 @@
 
 	conn->user_iter = user_directory_iter_init(dir->users);
 	(void)director_connection_send_users(conn);
+	o_stream_uncork(conn->output);
 }
 
 struct director_connection *
@@ -1264,9 +1376,7 @@
 	conn = director_connection_init_common(dir, fd);
 	conn->name = i_strdup_printf("%s/out", host->name);
 	conn->host = host;
-	/* use IO_READ instead of IO_WRITE, so that we don't assign
-	   dir->right until remote has actually sent some data */
-	conn->io = io_add(conn->fd, IO_READ,
+	conn->io = io_add(conn->fd, IO_WRITE,
 			  director_connection_connected, conn);
 	return conn;
 }
@@ -1282,9 +1392,12 @@
 	if (dir->debug && conn->host != NULL)
 		i_debug("Disconnecting from %s", conn->host->name);
 
-	if (conn->host != NULL && !conn->in &&
-	    conn->created + DIRECTOR_SUCCESS_MIN_CONNECT_SECS > ioloop_time)
+	if (conn->host != NULL && !conn->wrong_host &&
+	    (!conn->handshake_received ||
+	     conn->created + DIRECTOR_SUCCESS_MIN_CONNECT_SECS > ioloop_time)) {
+		/* avoid reconnecting back here immediately */
 		conn->host->last_failed = ioloop_time;
+	}
 
 	conns = array_get(&dir->connections, &count);
 	for (i = 0; i < count; i++) {
@@ -1294,8 +1407,12 @@
 		}
 	}
 	i_assert(i < count);
-	if (dir->left == conn)
+	if (dir->left == conn) {
 		dir->left = NULL;
+		/* if there is already another handshaked incoming connection,
+		   use it as the new "left" */
+		director_assign_left(dir);
+	}
 	if (dir->right == conn)
 		dir->right = NULL;
 
@@ -1303,6 +1420,8 @@
 		user_directory_iter_deinit(&conn->user_iter);
 	if (conn->to != NULL)
 		timeout_remove(&conn->to);
+	if (conn->to_pong != NULL)
+		timeout_remove(&conn->to_pong);
 	timeout_remove(&conn->to_ping);
 	if (conn->io != NULL)
 		io_remove(&conn->io);
@@ -1360,28 +1479,30 @@
 	}
 }
 
-void director_connection_send_except(struct director_connection *conn,
-				     struct director_host *skip_host,
-				     const char *data)
-{
-	if (conn->host != skip_host)
-		director_connection_send(conn, data);
-}
-
-static void director_connection_ping_timeout(struct director_connection *conn)
+static void
+director_connection_ping_idle_timeout(struct director_connection *conn)
 {
 	i_error("director(%s): Ping timed out, disconnecting", conn->name);
 	director_connection_disconnected(&conn);
 }
 
-static void director_connection_ping(struct director_connection *conn)
+static void director_connection_pong_timeout(struct director_connection *conn)
+{
+	i_error("director(%s): PONG reply not received although other "
+		"input keeps coming, disconnecting", conn->name);
+	director_connection_disconnected(&conn);
+}
+
+void director_connection_ping(struct director_connection *conn)
 {
 	if (conn->ping_waiting)
 		return;
 
 	timeout_remove(&conn->to_ping);
-	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS,
-				    director_connection_ping_timeout, conn);
+	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_IDLE_TIMEOUT_MSECS,
+				    director_connection_ping_idle_timeout, conn);
+	conn->to_pong = timeout_add(DIRECTOR_CONNECTION_PONG_TIMEOUT_MSECS,
+				    director_connection_pong_timeout, conn);
 	director_connection_send(conn, "PING\n");
 	conn->ping_waiting = TRUE;
 }