changeset 11629:a07aa85f68c9 HEAD

director: Lots of fixes. It should be pretty stable now.
author Timo Sirainen <tss@iki.fi>
date Thu, 24 Jun 2010 20:29:27 +0100
parents 7885030184ab
children c9a62b0d9d36
files src/director/auth-connection.c src/director/auth-connection.h src/director/director-connection.c src/director/director-connection.h src/director/director-host.h src/director/director-request.c src/director/director-test.c src/director/director-test.sh src/director/director.c src/director/director.h src/director/doveadm-connection.c src/director/main.c
diffstat 12 files changed, 563 insertions(+), 147 deletions(-) [+]
line wrap: on
line diff
--- a/src/director/auth-connection.c	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/auth-connection.c	Thu Jun 24 20:29:27 2010 +0100
@@ -27,6 +27,8 @@
 
 static struct auth_connection *auth_connections;
 
+static void auth_connection_disconnected(struct auth_connection **conn);
+
 static void auth_connection_input(struct auth_connection *conn)
 {
 	char *line;
@@ -36,13 +38,13 @@
 		return;
 	case -1:
 		/* disconnected */
-		auth_connection_deinit(&conn);
+		auth_connection_disconnected(&conn);
 		return;
 	case -2:
 		/* buffer full */
 		i_error("BUG: Auth server sent us more than %d bytes",
 			(int)AUTH_CLIENT_MAX_LINE_LENGTH);
-		auth_connection_deinit(&conn);
+		auth_connection_disconnected(&conn);
 		return;
 	}
 
@@ -103,12 +105,20 @@
 
 		if (close(conn->fd) < 0)
 			i_error("close(auth connection) failed: %m");
-		conn->callback(NULL, conn->context);
 	}
 	i_free(conn->path);
 	i_free(conn);
 }
 
+static void auth_connection_disconnected(struct auth_connection **_conn)
+{
+	struct auth_connection *conn = *_conn;
+
+	*_conn = NULL;
+	/* notify callback. it should deinit this connection */
+	conn->callback(NULL, conn->context);
+}
+
 void auth_connection_send(struct auth_connection *conn,
 			  const void *data, size_t size)
 {
@@ -122,6 +132,6 @@
 	while (auth_connections != NULL) {
 		struct auth_connection *conn = auth_connections;
 
-		auth_connection_deinit(&conn);
+		auth_connection_disconnected(&conn);
 	}
 }
--- a/src/director/auth-connection.h	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/auth-connection.h	Thu Jun 24 20:29:27 2010 +0100
@@ -2,7 +2,7 @@
 #define AUTH_CONNECTION_H
 
 /* Called for each input line. This is also called with line=NULL if
-   connection gets disonnected. */
+   connection gets disconnected. */
 typedef void auth_input_callback(const char *line, void *context);
 
 struct auth_connection *auth_connection_init(const char *path);
--- a/src/director/director-connection.c	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/director-connection.c	Thu Jun 24 20:29:27 2010 +0100
@@ -7,6 +7,7 @@
 #include "istream.h"
 #include "ostream.h"
 #include "str.h"
+#include "llist.h"
 #include "master-service.h"
 #include "mail-host.h"
 #include "director.h"
@@ -25,14 +26,20 @@
 #define MAX_INBUF_SIZE 1024
 #define MAX_OUTBUF_SIZE (1024*1024*10)
 #define OUTBUF_FLUSH_THRESHOLD (1024*128)
+/* Max idling time while connecting/handshaking before disconnecting */
+#define DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS (2*1000)
 /* How long to wait for PONG after PING request */
 #define DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS (2*1000)
 /* How long to wait to send PING when connection is idle */
 #define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000)
+/* How long to wait before sending PING while waiting for SYNC reply */
+#define DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS 1000
 
 struct director_connection {
+	struct director_connection *prev, *next;
+
 	struct director *dir;
-	const char *name;
+	char *name;
 
 	/* for incoming connections the director host isn't known until
 	   ME-line is received */
@@ -54,9 +61,11 @@
 	unsigned int ignore_host_events:1;
 	unsigned int handshake_sending_hosts:1;
 	unsigned int ping_waiting:1;
+	unsigned int sync_ping:1;
 };
 
 static void director_connection_ping(struct director_connection *conn);
+static void director_connection_disconnected(struct director_connection **conn);
 
 static bool
 director_args_parse_ip_port(struct director_connection *conn,
@@ -102,7 +111,12 @@
 	if (!conn->in)
 		return TRUE;
 
+	i_free(conn->name);
+	conn->name = i_strdup_printf("%s/left", host->name);
 	conn->host = host;
+	/* make sure we don't keep old sequence values across restarts */
+	host->last_seq = 0;
+
 	connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
 				      net_ip2addr(&host->ip), host->port);
 	/* make sure this is the correct incoming connection */
@@ -135,6 +149,8 @@
 		   one, but before that tell it to connect to the new one.
 		   that message might not reach it, so also send the same
 		   message to right side. */
+		i_warning("Replacing director connection %s with %s",
+			  dir->left->host->name, host->name);
 		director_connection_send(dir->left, connect_str);
 		(void)o_stream_flush(dir->left->output);
 		director_connection_deinit(&dir->left);
@@ -227,6 +243,33 @@
 	return TRUE;
 }
 
+static bool
+director_cmd_user(struct director_connection *conn, const char *const *args)
+{
+	unsigned int username_hash;
+	struct ip_addr ip;
+	struct mail_host *host;
+	struct user *user;
+
+	if (str_array_length(args) != 2 ||
+	    str_to_uint(args[0], &username_hash) < 0 ||
+	    net_addr2ip(args[1], &ip) < 0) {
+		i_error("director(%s): Invalid USER args", conn->name);
+		return FALSE;
+	}
+
+	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
+	if (host == NULL) {
+		/* we probably just removed this host. */
+		return TRUE;
+	}
+
+	if (director_user_refresh(conn->dir, username_hash,
+				  host, ioloop_time, &user))
+		director_update_user(conn->dir, conn->host, user);
+	return TRUE;
+}
+
 static bool director_cmd_director(struct director_connection *conn,
 				  const char *const *args)
 {
@@ -269,7 +312,7 @@
 		hosts = mail_hosts_get(conn->dir->mail_hosts);
 		while (array_count(hosts) > 0) {
 			hostp = array_idx(hosts, 0);
-			director_remove_host(conn->dir, conn->host, *hostp);
+			director_remove_host(conn->dir, NULL, NULL, *hostp);
 		}
 	} else if (!remote_ring_completed && conn->dir->ring_handshaked) {
 		/* ignore whatever remote sends */
@@ -279,8 +322,46 @@
 	return TRUE;
 }
 
+static int
+director_cmd_is_seen(struct director_connection *conn,
+		     const char *const **_args,
+		     struct director_host **host_r)
+{
+	const char *const *args = *_args;
+	struct ip_addr ip;
+	unsigned int port, seq;
+	struct director_host *host;
+
+	if (str_array_length(args) < 3 ||
+	    net_addr2ip(args[0], &ip) < 0 ||
+	    str_to_uint(args[1], &port) < 0 ||
+	    str_to_uint(args[2], &seq) < 0) {
+		i_error("director(%s): Command is missing parameters",
+			conn->name);
+		return -1;
+	}
+	*_args = args + 3;
+
+	host = director_host_lookup(conn->dir, &ip, port);
+	if (host == NULL) {
+		/* director is already gone, but we can't be sure if this
+		   command was sent everywhere. re-send it as if it was from
+		   ourself. */
+		*host_r = NULL;
+	} else {
+		if (seq <= host->last_seq) {
+			/* already seen this */
+			return 1;
+		}
+		*host_r = host;
+		host->last_seq = seq;
+	}
+	return 0;
+}
+
 static bool
-director_cmd_host(struct director_connection *conn, const char *const *args)
+director_cmd_host_int(struct director_connection *conn, const char *const *args,
+		      struct director_host *dir_host)
 {
 	struct mail_host *host;
 	struct ip_addr ip;
@@ -311,17 +392,40 @@
 	if (update) {
 		mail_host_set_vhost_count(conn->dir->mail_hosts,
 					  host, vhost_count);
-		director_update_host(conn->dir, conn->host, host);
+		director_update_host(conn->dir, conn->host, dir_host, host);
 	}
 	return TRUE;
 }
 
 static bool
+director_cmd_host_handshake(struct director_connection *conn,
+			    const char *const *args)
+{
+	return director_cmd_host_int(conn, args, NULL);
+}
+
+static bool
+director_cmd_host(struct director_connection *conn, const char *const *args)
+{
+	struct director_host *dir_host;
+	int ret;
+
+	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
+		return ret > 0;
+	return director_cmd_host_int(conn, args, dir_host);
+}
+
+static bool
 director_cmd_host_remove(struct director_connection *conn,
 			 const char *const *args)
 {
+	struct director_host *dir_host;
 	struct mail_host *host;
 	struct ip_addr ip;
+	int ret;
+
+	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
+		return ret > 0;
 
 	if (str_array_length(args) != 1 ||
 	    net_addr2ip(args[0], &ip) < 0) {
@@ -331,7 +435,7 @@
 
 	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
 	if (host != NULL)
-		director_remove_host(conn->dir, conn->host, host);
+		director_remove_host(conn->dir, conn->host, dir_host, host);
 	return TRUE;
 }
 
@@ -339,18 +443,25 @@
 director_cmd_host_flush(struct director_connection *conn,
 			 const char *const *args)
 {
+	struct director_host *dir_host;
 	struct mail_host *host;
 	struct ip_addr ip;
+	unsigned int seq;
+	int ret;
 
-	if (str_array_length(args) != 1 ||
-	    net_addr2ip(args[0], &ip) < 0) {
+	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
+		return ret > 0;
+
+	if (str_array_length(args) != 2 ||
+	    net_addr2ip(args[0], &ip) < 0 ||
+	    str_to_uint(args[1], &seq) < 0) {
 		i_error("director(%s): Invalid HOST-FLUSH args", conn->name);
 		return FALSE;
 	}
 
 	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
 	if (host != NULL)
-		director_flush_host(conn->dir, conn->host, host);
+		director_flush_host(conn->dir, conn->host, dir_host, host);
 	return TRUE;
 }
 
@@ -380,16 +491,16 @@
 	    dir->left->handshake_received && dir->right->handshake_received) {
 		/* we're connected to both directors. see if the ring is
 		   finished by sending a SYNC. if we get it back, it's done. */
-		dir->sync_seq = ++dir->self_host->last_seq;
+		dir->sync_seq++;
 		director_connection_send(dir->right,
 			t_strdup_printf("SYNC\t%s\t%u\t%u\n",
 					net_ip2addr(&dir->self_ip),
 					dir->self_port, dir->sync_seq));
 	}
-	if (conn->to_ping == NULL) {
-		conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
-					    director_connection_ping, conn);
-	}
+	if (conn->to_ping != NULL)
+		timeout_remove(&conn->to_ping);
+	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
+				    director_connection_ping, conn);
 }
 
 static bool
@@ -444,7 +555,10 @@
 	if (strcmp(cmd, "HOST") == 0) {
 		/* allow hosts from all connections always,
 		   this could be an host update */
-		return director_cmd_host(conn, args);
+		if (conn->handshake_sending_hosts)
+			return director_cmd_host_handshake(conn, args);
+		else
+			return director_cmd_host(conn, args);
 	}
 	if (conn->handshake_sending_hosts &&
 	    strcmp(cmd, "HOST-HAND-END") == 0) {
@@ -456,47 +570,27 @@
 	    conn->me_received)
 		return director_cmd_host_hand_start(conn, args);
 
-	/* only incoming connections get a USER list */
-	if (conn->in && strcmp(cmd, "USER") == 0 && conn->me_received)
-		return director_handshake_cmd_user(conn, args);
+	/* only incoming connections get a full USER list, but outgoing
+	   connections can also receive USER updates during handshake and
+	   it wouldn't be safe to ignore them. */
+	if (strcmp(cmd, "USER") == 0 && conn->me_received) {
+		if (conn->in)
+			return director_handshake_cmd_user(conn, args);
+		else
+			return director_cmd_user(conn, args);
+	}
 	/* both get DONE */
 	if (strcmp(cmd, "DONE") == 0 && !conn->handshake_received &&
 	    !conn->handshake_sending_hosts) {
 		director_handshake_cmd_done(conn);
 		return TRUE;
 	}
-	i_error("director(%s): Invalid handshake command: %s",
-		conn->name, cmd);
+	i_error("director(%s): Invalid handshake command: %s "
+		"(in=%d me_received=%d)", conn->name, cmd,
+		conn->in, conn->me_received);
 	return FALSE;
 }
 
-static bool
-director_cmd_user(struct director_connection *conn, const char *const *args)
-{
-	unsigned int username_hash;
-	struct ip_addr ip;
-	struct mail_host *host;
-	struct user *user;
-
-	if (str_array_length(args) != 2 ||
-	    str_to_uint(args[0], &username_hash) < 0 ||
-	    net_addr2ip(args[1], &ip) < 0) {
-		i_error("director(%s): Invalid USER args", conn->name);
-		return FALSE;
-	}
-
-	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
-	if (host == NULL) {
-		/* we probably just removed this host. */
-		return TRUE;
-	}
-
-	if (director_user_refresh(conn->dir, username_hash,
-				  host, ioloop_time, &user))
-		director_update_user(conn->dir, conn->host, user);
-	return TRUE;
-}
-
 static bool director_connection_sync(struct director_connection *conn,
 				     const char *const *args, const char *line)
 {
@@ -523,24 +617,21 @@
 			/* stale SYNC event */
 			return TRUE;
 		}
+
 		if (!dir->ring_handshaked) {
 			/* the ring is handshaked */
 			director_set_ring_handshaked(dir);
-			return TRUE;
-		}
-
-		if (dir->ring_synced) {
+		} else if (dir->ring_synced) {
 			i_error("Received SYNC from %s (seq=%u) "
 				"while already synced", conn->name, seq);
 			return TRUE;
+		} else {
+			if (dir->debug) {
+				i_debug("Ring is synced (%s sent seq=%u)",
+					conn->name, seq);
+			}
+			director_set_ring_synced(dir);
 		}
-
-		if (dir->debug) {
-			i_debug("Ring is synced (%s sent seq=%u)",
-				conn->name, seq);
-		}
-		dir->ring_synced = TRUE;
-		director_set_state_changed(dir);
 		return TRUE;
 	}
 
@@ -597,10 +688,7 @@
 		}
 	}
 
-	/* connect here, disconnect old one */
-	if (dir->right != NULL)
-		director_connection_deinit(&dir->right);
-
+	/* connect here */
 	(void)director_connect_host(dir, host);
 	return TRUE;
 }
@@ -629,6 +717,15 @@
 		i_error("director(%s): Received empty line", conn->name);
 		return FALSE;
 	}
+
+	/* ping/pong is always handled */
+	if (strcmp(cmd, "PING") == 0) {
+		director_connection_send(conn, "PONG\n");
+		return TRUE;
+	}
+	if (strcmp(cmd, "PONG") == 0)
+		return director_cmd_pong(conn);
+
 	if (!conn->handshake_received) {
 		if (!director_connection_handle_handshake(conn, cmd, args)) {
 			/* invalid commands during handshake,
@@ -655,12 +752,6 @@
 	if (strcmp(cmd, "CONNECT") == 0)
 		return director_cmd_connect(conn, args);
 
-	if (strcmp(cmd, "PING") == 0) {
-		director_connection_send(conn, "PONG\n");
-		return TRUE;
-	}
-	if (strcmp(cmd, "PONG") == 0)
-		return director_cmd_pong(conn);
 	i_error("director(%s): Unknown command (in this state): %s",
 		conn->name, cmd);
 	return FALSE;
@@ -668,6 +759,7 @@
 
 static void director_connection_input(struct director_connection *conn)
 {
+	struct director *dir = conn->dir;
 	char *line;
 	bool ret;
 
@@ -678,26 +770,31 @@
 		return;
 	case -1:
 		/* disconnected */
-		director_connection_deinit(&conn);
+		i_error("Director %s disconnected%s", conn->name,
+			conn->handshake_received ? "" :
+			" before handshake finished");
+		director_connection_disconnected(&conn);
 		return;
 	case -2:
 		/* buffer full */
-		i_error("BUG: Director sent us more than %d bytes",
-			MAX_INBUF_SIZE);
-		director_connection_deinit(&conn);
+		i_error("BUG: Director %s sent us more than %d bytes",
+			conn->name, MAX_INBUF_SIZE);
+		director_connection_disconnected(&conn);
 		return;
 	}
 
+	director_sync_freeze(dir);
 	while ((line = i_stream_next_line(conn->input)) != NULL) {
 		T_BEGIN {
 			ret = director_connection_handle_line(conn, line);
 		} T_END;
 
 		if (!ret) {
-			director_connection_deinit(&conn);
+			director_connection_disconnected(&conn);
 			break;
 		}
 	}
+	director_sync_thaw(dir);
 }
 
 static void director_connection_send_directors(struct director_connection *conn,
@@ -773,6 +870,18 @@
 		return o_stream_flush(conn->output);
 }
 
+static void
+director_connection_init_timeout(struct director_connection *conn)
+{
+	if (conn->host != NULL)
+		conn->host->last_failed = ioloop_time;
+	if (!conn->connected)
+		i_error("director(%s): Connect timed out", conn->name);
+	else
+		i_error("director(%s): Handshaking timed out", conn->name);
+	director_connection_disconnected(&conn);
+}
+
 static struct director_connection *
 director_connection_init_common(struct director *dir, int fd)
 {
@@ -785,6 +894,9 @@
 	conn->output = o_stream_create_fd(conn->fd, MAX_OUTBUF_SIZE, FALSE);
 	o_stream_set_flush_callback(conn->output,
 				    director_connection_output, conn);
+	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS,
+				    director_connection_init_timeout, conn);
+	DLLIST_PREPEND(&dir->connections, conn);
 	return conn;
 }
 
@@ -798,14 +910,15 @@
 }
 
 struct director_connection *
-director_connection_init_in(struct director *dir, int fd)
+director_connection_init_in(struct director *dir, int fd,
+			    const struct ip_addr *ip)
 {
 	struct director_connection *conn;
 
 	conn = director_connection_init_common(dir, fd);
 	conn->in = TRUE;
 	conn->connected = TRUE;
-	conn->name = "<incoming>";
+	conn->name = i_strdup_printf("%s/in", net_ip2addr(ip));
 	conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
 
 	director_connection_send_handshake(conn);
@@ -822,14 +935,26 @@
 		conn->host->last_failed = ioloop_time;
 		i_error("director(%s): connect() failed: %s", conn->name,
 			strerror(err));
-		director_connection_deinit(&conn);
-
-		/* try connecting to next server */
-		director_connect(dir);
+		director_connection_disconnected(&conn);
 		return;
 	}
 	conn->connected = TRUE;
 
+	if (dir->right != NULL) {
+		/* see if we should disconnect or keep the existing
+		   connection. */
+		if (director_host_cmp_to_self(conn->host, dir->right->host,
+					      dir->self_host) <= 0) {
+			/* the old connection is the correct one */
+			director_connection_deinit(&conn);
+			return;
+		}
+		director_connection_deinit(&dir->right);
+	}
+	dir->right = conn;
+	i_free(conn->name);
+	conn->name = i_strdup_printf("%s/right", conn->host->name);
+
 	io_remove(&conn->io);
 
 	director_connection_send_handshake(conn);
@@ -847,10 +972,15 @@
 {
 	struct director_connection *conn;
 
+	/* make sure we don't keep old sequence values across restarts */
+	host->last_seq = 0;
+
 	conn = director_connection_init_common(dir, fd);
-	conn->name = host->name;
+	conn->name = i_strdup_printf("%s/out", host->name);
 	conn->host = host;
-	conn->io = io_add(conn->fd, IO_WRITE,
+	/* use IO_READ instead of IO_WRITE, so that we don't assign
+	   dir->right until remote has actually sent some data */
+	conn->io = io_add(conn->fd, IO_READ,
 			  director_connection_connected, conn);
 	return conn;
 }
@@ -858,19 +988,18 @@
 void director_connection_deinit(struct director_connection **_conn)
 {
 	struct director_connection *conn = *_conn;
+	struct director *dir = conn->dir;
 
 	*_conn = NULL;
 
-	if (conn->dir->debug && conn->host != NULL) {
-		i_debug("Director %s:%u disconnected",
-			net_ip2addr(&conn->host->ip), conn->host->port);
-	}
+	DLLIST_REMOVE(&dir->connections, conn);
+	if (dir->left == conn)
+		dir->left = NULL;
+	if (dir->right == conn)
+		dir->right = NULL;
 
-	if (conn->dir->left == conn)
-		conn->dir->left = NULL;
-	if (conn->dir->right == conn)
-		conn->dir->right = NULL;
-
+	if (conn->user_iter != NULL)
+		user_directory_iter_deinit(&conn->user_iter);
 	if (conn->to != NULL)
 		timeout_remove(&conn->to);
 	if (conn->to_ping != NULL)
@@ -884,12 +1013,29 @@
 
 	if (conn->in)
 		master_service_client_connection_destroyed(master_service);
+	i_free(conn->name);
 	i_free(conn);
+
+	if (dir->left == NULL || dir->right == NULL) {
+		/* we aren't synced until we're again connected to a ring */
+		dir->sync_seq++;
+		dir->ring_synced = FALSE;
+	}
+}
+
+void director_connection_disconnected(struct director_connection **_conn)
+{
+	struct director_connection *conn = *_conn;
+	struct director *dir = conn->dir;
+
+	director_connection_deinit(_conn);
+	if (dir->right == NULL)
+		director_connect(dir);
 }
 
 static void director_connection_timeout(struct director_connection *conn)
 {
-	director_connection_deinit(&conn);
+	director_connection_disconnected(&conn);
 }
 
 void director_connection_send(struct director_connection *conn,
@@ -925,15 +1071,17 @@
 static void director_connection_ping_timeout(struct director_connection *conn)
 {
 	i_error("director(%s): Ping timed out, disconnecting", conn->name);
-	director_connection_deinit(&conn);
+	director_connection_disconnected(&conn);
 }
 
 static void director_connection_ping(struct director_connection *conn)
 {
+	conn->sync_ping = FALSE;
 	if (conn->ping_waiting)
 		return;
 
-	timeout_remove(&conn->to_ping);
+	if (conn->to_ping != NULL)
+		timeout_remove(&conn->to_ping);
 	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS,
 				    director_connection_ping_timeout, conn);
 	director_connection_send(conn, "PING\n");
@@ -944,3 +1092,46 @@
 {
 	return conn->name;
 }
+
+struct director_host *
+director_connection_get_host(struct director_connection *conn)
+{
+	return conn->host;
+}
+
+struct director_connection *
+director_connection_find_outgoing(struct director *dir,
+				  struct director_host *host)
+{
+	struct director_connection *conn;
+
+	for (conn = dir->connections; conn != NULL; conn = conn->next) {
+		if (conn->host == host && !conn->in)
+			return conn;
+	}
+	return NULL;
+}
+
+void director_connection_cork(struct director_connection *conn)
+{
+	o_stream_cork(conn->output);
+}
+
+void director_connection_uncork(struct director_connection *conn)
+{
+	o_stream_uncork(conn->output);
+}
+
+void director_connection_wait_sync(struct director_connection *conn)
+{
+	/* switch to faster ping timeout. avoid reseting the timeout if it's
+	   already fast. */
+	if (conn->ping_waiting || conn->sync_ping)
+		return;
+
+	if (conn->to_ping != NULL)
+		timeout_remove(&conn->to_ping);
+	conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS,
+				    director_connection_ping, conn);
+	conn->sync_ping = TRUE;
+}
--- a/src/director/director-connection.h	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/director-connection.h	Thu Jun 24 20:29:27 2010 +0100
@@ -5,7 +5,8 @@
 struct director;
 
 struct director_connection *
-director_connection_init_in(struct director *dir, int fd);
+director_connection_init_in(struct director *dir, int fd,
+			    const struct ip_addr *ip);
 struct director_connection *
 director_connection_init_out(struct director *dir, int fd,
 			     struct director_host *host);
@@ -13,10 +14,19 @@
 
 void director_connection_send(struct director_connection *conn,
 			      const char *data);
+void director_connection_wait_sync(struct director_connection *conn);
 void director_connection_send_except(struct director_connection *conn,
 				     struct director_host *skip_host,
 				     const char *data);
 
 const char *director_connection_get_name(struct director_connection *conn);
+struct director_host *
+director_connection_get_host(struct director_connection *conn);
+struct director_connection *
+director_connection_find_outgoing(struct director *dir,
+				  struct director_host *host);
+
+void director_connection_cork(struct director_connection *conn);
+void director_connection_uncork(struct director_connection *conn);
 
 #endif
--- a/src/director/director-host.h	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/director-host.h	Thu Jun 24 20:29:27 2010 +0100
@@ -11,15 +11,14 @@
 
 	/* name contains "ip:port" */
 	char *name;
-
-	/* each command between directors contains an increasing sequence.
-	   if director A gets conflicting information about director B, it can
-	   trust the one that has the highest sequence. */
+	/* change commands each have originating host and originating sequence.
+	   we'll keep track of the highest sequence we've seen from the host.
+	   if we find a lower sequence, we've already handled the command and
+	   it can be ignored (or: it must be ignored to avoid potential command
+	   loops) */
 	unsigned int last_seq;
-
 	/* Last time host was detected to be down/broken */
 	time_t last_failed;
-
 	/* we are this director */
 	unsigned int self:1;
 };
--- a/src/director/director-request.c	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/director-request.c	Thu Jun 24 20:29:27 2010 +0100
@@ -9,6 +9,7 @@
 #include "director-request.h"
 
 #define DIRECTOR_REQUEST_TIMEOUT_SECS 30
+#define RING_NOCONN_WARNING_DELAY_MSECS (2*1000)
 
 struct director_request {
 	struct director *dir;
@@ -66,6 +67,28 @@
 	array_append(&dir->pending_requests, &request, 1);
 }
 
+static void ring_noconn_warning(struct director *dir)
+{
+	if (!dir->ring_handshaked) {
+		i_warning("Delaying all requests "
+			  "until all directors have connected");
+	} else {
+		i_warning("Delaying new user requests until ring is synced");
+	}
+	dir->ring_handshake_warning_sent = TRUE;
+	timeout_remove(&dir->to_handshake_warning);
+}
+
+static void ring_log_delayed_warning(struct director *dir)
+{
+	if (dir->ring_handshake_warning_sent ||
+	    dir->to_handshake_warning != NULL)
+		return;
+
+	dir->to_handshake_warning = timeout_add(RING_NOCONN_WARNING_DELAY_MSECS,
+						ring_noconn_warning, dir);
+}
+
 bool director_request_continue(struct director_request *request)
 {
 	struct director *dir = request->dir;
@@ -74,11 +97,7 @@
 
 	if (!dir->ring_handshaked) {
 		/* delay requests until ring handshaking is complete */
-		if (!dir->ring_handshake_warning_sent) {
-			i_warning("Delaying requests until all "
-				  "directors have connected");
-			dir->ring_handshake_warning_sent = TRUE;
-		}
+		ring_log_delayed_warning(dir);
 		return FALSE;
 	}
 
@@ -88,8 +107,7 @@
 	else {
 		if (!dir->ring_synced) {
 			/* delay adding new users until ring is again synced */
-			if (dir->debug)
-				i_debug("Delaying request until ring is synced");
+			ring_log_delayed_warning(dir);
 			return FALSE;
 		}
 		host = mail_host_get_by_hash(dir->mail_hosts,
--- a/src/director/director-test.c	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/director-test.c	Thu Jun 24 20:29:27 2010 +0100
@@ -36,6 +36,7 @@
 #define USER_TIMEOUT_MSECS (1000*60)
 #define ADMIN_RANDOM_TIMEOUT_MSECS 500
 #define DIRECTOR_CONN_MAX_DELAY_MSECS 100
+#define DIRECTOR_DISCONNECT_TIMEOUT_SECS 10
 
 struct host {
 	int refcount;
@@ -83,6 +84,7 @@
 	struct io *io;
 	struct istream *input;
 	struct timeout *to_random;
+	bool pending_command;
 };
 
 static struct imap_client *imap_clients;
@@ -91,6 +93,7 @@
 static struct hash_table *hosts;
 static ARRAY_DEFINE(hosts_array, struct host *);
 static struct admin_connection *admin;
+static struct timeout *to_disconnect;
 
 static void imap_client_destroy(struct imap_client **client);
 static void director_connection_destroy(struct director_connection **conn);
@@ -330,6 +333,13 @@
 director_connection_create(int in_fd, const struct ip_addr *local_ip)
 {
 	struct director_connection *conn;
+	int out_fd;
+
+	out_fd = net_connect_ip(local_ip, DIRECTOR_OUT_PORT, NULL);
+	if (out_fd == -1) {
+		(void)close(in_fd);
+		return;
+	}
 
 	conn = i_new(struct director_connection, 1);
 	conn->in_fd = in_fd;
@@ -338,7 +348,7 @@
 	conn->in_io = io_add(conn->in_fd, IO_READ,
 			     director_connection_in_input, conn);
 
-	conn->out_fd = net_connect_ip(local_ip, DIRECTOR_OUT_PORT, NULL);
+	conn->out_fd = out_fd;
 	conn->out_input = i_stream_create_fd(conn->out_fd, (size_t)-1, FALSE);
 	conn->out_output = o_stream_create_fd(conn->out_fd, (size_t)-1, FALSE);
 	conn->out_io = io_add(conn->out_fd, IO_READ,
@@ -404,6 +414,7 @@
 	while ((line = i_stream_read_next_line(conn->input)) != NULL) {
 		if (strcmp(line, "OK") != 0)
 			i_error("director-doveadm: Unexpected input: %s", line);
+		conn->pending_command = FALSE;
 	}
 	if (conn->input->stream_errno != 0 || conn->input->eof)
 		i_fatal("director-doveadm: Connection lost");
@@ -414,6 +425,9 @@
 	struct host *const *hosts;
 	unsigned int i, count;
 
+	if (conn->pending_command)
+		return;
+
 	hosts = array_get(&hosts_array, &count);
 	i = rand() % count;
 
@@ -421,6 +435,7 @@
 
 	admin_send(conn, t_strdup_printf("HOST-SET\t%s\t%u\n",
 		net_ip2addr(&hosts[i]->ip), hosts[i]->vhost_count));
+	conn->pending_command = TRUE;
 }
 
 static struct admin_connection *admin_connect(const char *path)
@@ -458,7 +473,8 @@
 	struct admin_connection *conn = *_conn;
 
 	*_conn = NULL;
-	//timeout_remove(&conn->to_random);
+	if (conn->to_random != NULL)
+		timeout_remove(&conn->to_random);
 	i_stream_destroy(&conn->input);
 	io_remove(&conn->io);
 	net_disconnect(conn->fd);
@@ -492,6 +508,23 @@
 	net_set_nonblock(admin->fd, TRUE);
 }
 
+static void
+director_connection_disconnect_timeout(void *context ATTR_UNUSED)
+{
+	struct director_connection *conn;
+	unsigned int i, count = 0;
+
+	for (conn = director_connections; conn != NULL; conn = conn->next)
+		count++;
+
+	if (count != 0) {
+		i = 0; count = rand() % count;
+		for (conn = director_connections; i < count; conn = conn->next)
+			i++;
+		director_connection_destroy(&conn);
+	}
+}
+
 static void main_init(const char *admin_path)
 {
 	users = hash_table_create(default_pool, default_pool, 0,
@@ -504,6 +537,10 @@
 	admin = admin_connect(admin_path);
 	admin_send(admin, "HOST-LIST\n");
 	admin_read_hosts(admin);
+
+	to_disconnect =
+		timeout_add(1000*(1 + rand()%DIRECTOR_DISCONNECT_TIMEOUT_SECS),
+			    director_connection_disconnect_timeout, NULL);
 }
 
 static void main_deinit(void)
@@ -516,6 +553,7 @@
 		imap_client_destroy(&client);
 	}
 
+	timeout_remove(&to_disconnect);
 	while (director_connections != NULL) {
 		struct director_connection *conn = director_connections;
 		director_connection_destroy(&conn);
--- a/src/director/director-test.sh	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/director-test.sh	Thu Jun 24 20:29:27 2010 +0100
@@ -15,11 +15,11 @@
 listen = 127.0.1.$i
 base_dir = /var/run/dovecot$i
 
-!include dovecot-director-common.conf
+!include dovecot-director-common.conf.inc
 EOF
 done
 
-cat > dovecot-director-common.conf <<EOF
+cat > dovecot-director-common.conf.inc <<EOF
 log_path = /var/log/dovecot.log
 info_log_path = /var/log/dovecot-access.log
 director_servers =$dirs
@@ -83,12 +83,7 @@
 echo
 echo "Start up dovecot instances:"
 echo
-echo "dovecot -c dovecot-test.conf"
-i=0
-while [ $i != $director_count ]; do
-  i=`expr $i + 1`
-  echo "dovecot -c dovecot-director$i.conf"
-done
+echo 'for conf in dovecot*.conf; do dovecot -c $conf; done'
 echo
 echo "Start testing:"
 echo
--- a/src/director/director.c	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/director.c	Thu Jun 24 20:29:27 2010 +0100
@@ -11,6 +11,7 @@
 #include "director.h"
 
 #define DIRECTOR_RECONNECT_RETRY_SECS 60
+#define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000)
 
 static bool director_is_self_ip_set(struct director *dir)
 {
@@ -79,7 +80,8 @@
 	unsigned int port;
 	int fd;
 
-	i_assert(dir->right == NULL);
+	if (director_connection_find_outgoing(dir, host) != NULL)
+		return 0;
 
 	if (dir->debug) {
 		i_debug("Connecting to %s:%u",
@@ -93,10 +95,24 @@
 		return -1;
 	}
 
-	dir->right = director_connection_init_out(dir, fd, host);
+	director_connection_init_out(dir, fd, host);
 	return 0;
 }
 
+static struct director_host *
+director_get_preferred_right_host(struct director *dir)
+{
+	struct director_host *const *hosts;
+	unsigned int count, self_idx;
+
+	hosts = array_get(&dir->dir_hosts, &count);
+	if (count == 1)
+		return NULL;
+
+	self_idx = director_find_self_idx(dir);
+	return hosts[(self_idx + 1) % count];
+}
+
 void director_connect(struct director *dir)
 {
 	struct director_host *const *hosts;
@@ -122,28 +138,98 @@
 	}
 	if (i == count) {
 		/* we're the only one */
-		director_set_ring_handshaked(dir);
+		if (dir->left != NULL) {
+			/* since we couldn't connect to it,
+			   it must have failed recently */
+			director_connection_deinit(&dir->left);
+		}
+		if (!dir->ring_handshaked)
+			director_set_ring_handshaked(dir);
+		else
+			director_set_ring_synced(dir);
 	}
 }
 
 void director_set_ring_handshaked(struct director *dir)
 {
+	i_assert(!dir->ring_handshaked);
+
+	if (dir->to_handshake_warning != NULL)
+		timeout_remove(&dir->to_handshake_warning);
 	if (dir->ring_handshake_warning_sent) {
 		i_warning("Directors have been connected, "
-			  "continuing delayed connections");
+			  "continuing delayed requests");
 		dir->ring_handshake_warning_sent = FALSE;
 	}
 	if (dir->debug)
 		i_debug("Director ring handshaked");
 
 	dir->ring_handshaked = TRUE;
+	director_set_ring_synced(dir);
+}
+
+static void director_reconnect_timeout(struct director *dir)
+{
+	struct director_host *cur_host, *preferred_host =
+		director_get_preferred_right_host(dir);
+
+	cur_host = dir->right == NULL ? NULL :
+		director_connection_get_host(dir->right);
+
+	if (cur_host != preferred_host)
+		(void)director_connect_host(dir, preferred_host);
+	else {
+		/* the connection hasn't finished sync yet.
+		   keep this timeout for now. */
+	}
+}
+
+void director_set_ring_synced(struct director *dir)
+{
+	struct director_host *host;
+
+	i_assert(!dir->ring_synced);
+	i_assert((dir->left != NULL && dir->right != NULL) ||
+		 (dir->left == NULL && dir->right == NULL));
+
+	if (dir->to_handshake_warning != NULL)
+		timeout_remove(&dir->to_handshake_warning);
+	if (dir->ring_handshake_warning_sent) {
+		i_warning("Ring is synced, continuing delayed requests");
+		dir->ring_handshake_warning_sent = FALSE;
+	}
+
+	host = dir->right == NULL ? NULL :
+		director_connection_get_host(dir->right);
+	if (host != director_get_preferred_right_host(dir)) {
+		/* try to reconnect to preferred host later */
+		if (dir->to_reconnect == NULL) {
+			dir->to_reconnect =
+				timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS,
+					    director_reconnect_timeout, dir);
+		}
+	} else {
+		if (dir->to_reconnect != NULL)
+			timeout_remove(&dir->to_reconnect);
+	}
+
 	dir->ring_synced = TRUE;
 	director_set_state_changed(dir);
 }
 
 static void director_sync(struct director *dir)
 {
-	/* we're synced again, once we receive this SYNC back */
+	if (dir->sync_frozen) {
+		dir->sync_pending = TRUE;
+		return;
+	}
+	if (dir->right == NULL) {
+		i_assert(!dir->ring_synced ||
+			 (dir->left == NULL && dir->right == NULL));
+		return;
+	}
+
+	/* we're synced again when we receive this SYNC back */
 	dir->sync_seq++;
 	dir->ring_synced = FALSE;
 
@@ -153,36 +239,94 @@
 			director_connection_get_name(dir->right));
 	}
 
+	if (dir->left != NULL)
+		director_connection_wait_sync(dir->left);
+	director_connection_wait_sync(dir->right);
 	director_connection_send(dir->right, t_strdup_printf(
 		"SYNC\t%s\t%u\t%u\n", net_ip2addr(&dir->self_ip),
 		dir->self_port, dir->sync_seq));
 }
 
+void director_sync_freeze(struct director *dir)
+{
+	i_assert(!dir->sync_frozen);
+	i_assert(!dir->sync_pending);
+
+	if (dir->left != NULL)
+		director_connection_cork(dir->left);
+	if (dir->right != NULL)
+		director_connection_cork(dir->right);
+	dir->sync_frozen = TRUE;
+}
+
+void director_sync_thaw(struct director *dir)
+{
+	i_assert(dir->sync_frozen);
+
+	dir->sync_frozen = FALSE;
+	if (dir->sync_pending) {
+		dir->sync_pending = FALSE;
+		director_sync(dir);
+	}
+	if (dir->left != NULL)
+		director_connection_uncork(dir->left);
+	if (dir->right != NULL)
+		director_connection_uncork(dir->right);
+}
+
 void director_update_host(struct director *dir, struct director_host *src,
+			  struct director_host *orig_src,
 			  struct mail_host *host)
 {
+	/* update state in case this is the first mail host being added */
 	director_set_state_changed(dir);
 
+	if (orig_src == NULL) {
+		orig_src = dir->self_host;
+		orig_src->last_seq++;
+	}
+
 	director_update_send(dir, src, t_strdup_printf(
-		"HOST\t%s\t%u\n", net_ip2addr(&host->ip), host->vhost_count));
+		"HOST\t%s\t%u\t%u\t%s\t%u\n",
+		net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
+		net_ip2addr(&host->ip), host->vhost_count));
 	director_sync(dir);
 }
 
 void director_remove_host(struct director *dir, struct director_host *src,
+			  struct director_host *orig_src,
 			  struct mail_host *host)
 {
-	director_update_send(dir, src, t_strdup_printf(
-		"HOST-REMOVE\t%s\n", net_ip2addr(&host->ip)));
+	if (src != NULL) {
+		if (orig_src == NULL) {
+			orig_src = dir->self_host;
+			orig_src->last_seq++;
+		}
+
+		director_update_send(dir, src, t_strdup_printf(
+			"HOST-REMOVE\t%s\t%u\t%u\t%s\n",
+			net_ip2addr(&orig_src->ip), orig_src->port,
+			orig_src->last_seq, net_ip2addr(&host->ip)));
+	}
+
 	user_directory_remove_host(dir->users, host);
 	mail_host_remove(dir->mail_hosts, host);
 	director_sync(dir);
 }
 
 void director_flush_host(struct director *dir, struct director_host *src,
+			 struct director_host *orig_src,
 			 struct mail_host *host)
 {
+	if (orig_src == NULL) {
+		orig_src = dir->self_host;
+		orig_src->last_seq++;
+	}
+
 	director_update_send(dir, src, t_strdup_printf(
-		"HOST-FLUSH\t%s\n", net_ip2addr(&host->ip)));
+		"HOST-FLUSH\t%s\t%u\t%u\t%s\n",
+		net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
+		net_ip2addr(&host->ip)));
 	user_directory_remove_host(dir->users, host);
 	director_sync(dir);
 }
@@ -245,6 +389,10 @@
 	user_directory_deinit(&dir->users);
 	mail_hosts_deinit(&dir->mail_hosts);
 	mail_hosts_deinit(&dir->orig_config_hosts);
+	if (dir->to_reconnect != NULL)
+		timeout_remove(&dir->to_reconnect);
+	if (dir->to_handshake_warning != NULL)
+		timeout_remove(&dir->to_handshake_warning);
 	if (dir->to_request != NULL)
 		timeout_remove(&dir->to_request);
 	array_foreach(&dir->dir_hosts, hostp)
--- a/src/director/director.h	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/director.h	Thu Jun 24 20:29:27 2010 +0100
@@ -10,16 +10,6 @@
 
 typedef void director_state_change_callback_t(struct director *dir);
 
-struct director_host_change {
-	/* originating director for this change. keep ip/port here separately,
-	   because by the time its sync comes, the director itself may have
-	   already been removed. */
-	struct ip_addr ip;
-	unsigned int port;
-	/* highest change sequence from this director */
-	unsigned int seq;
-};
-
 struct director {
 	const struct director_settings *set;
 
@@ -31,6 +21,9 @@
 
 	struct director_host *self_host;
 	struct director_connection *left, *right;
+	/* all director connections */
+	struct director_connection *connections;
+	struct timeout *to_reconnect;
 
 	/* current mail hosts */
 	struct mail_host_list *mail_hosts;
@@ -43,6 +36,7 @@
 	/* these requests are waiting for directors to be in synced */
 	ARRAY_DEFINE(pending_requests, struct director_request *);
 	struct timeout *to_request;
+	struct timeout *to_handshake_warning;
 
 	director_state_change_callback_t *state_change_callback;
 
@@ -56,6 +50,8 @@
 	unsigned int ring_handshaked:1;
 	unsigned int ring_handshake_warning_sent:1;
 	unsigned int ring_synced:1;
+	unsigned int sync_frozen:1;
+	unsigned int sync_pending:1;
 	unsigned int debug:1;
 };
 
@@ -73,17 +69,24 @@
 void director_connect(struct director *dir);
 
 void director_set_ring_handshaked(struct director *dir);
+void director_set_ring_synced(struct director *dir);
 void director_set_state_changed(struct director *dir);
 
 void director_update_host(struct director *dir, struct director_host *src,
+			  struct director_host *orig_src,
 			  struct mail_host *host);
 void director_remove_host(struct director *dir, struct director_host *src,
+			  struct director_host *orig_src,
 			  struct mail_host *host);
 void director_flush_host(struct director *dir, struct director_host *src,
+			 struct director_host *orig_src,
 			 struct mail_host *host);
 void director_update_user(struct director *dir, struct director_host *src,
 			  struct user *user);
 
+void director_sync_freeze(struct director *dir);
+void director_sync_thaw(struct director *dir);
+
 /* Send data to all directors using both left and right connections
    (unless they're the same). */
 void director_update_send(struct director *dir, struct director_host *src,
--- a/src/director/doveadm-connection.c	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/doveadm-connection.c	Thu Jun 24 20:29:27 2010 +0100
@@ -91,7 +91,7 @@
 		host = mail_host_add_ip(dir->mail_hosts, &ip);
 	if (vhost_count != -1U)
 		mail_host_set_vhost_count(dir->mail_hosts, host, vhost_count);
-	director_update_host(dir, dir->self_host, host);
+	director_update_host(dir, dir->self_host, NULL, host);
 
 	o_stream_send(conn->output, "OK\n", 3);
 	return TRUE;
@@ -111,7 +111,8 @@
 	if (host == NULL)
 		o_stream_send_str(conn->output, "NOTFOUND\n");
 	else {
-		director_remove_host(conn->dir, conn->dir->self_host, host);
+		director_remove_host(conn->dir, conn->dir->self_host,
+				     NULL, host);
 		o_stream_send(conn->output, "OK\n", 3);
 	}
 	return TRUE;
@@ -122,8 +123,10 @@
 {
 	struct mail_host *const *hostp;
 
-	array_foreach(mail_hosts_get(conn->dir->mail_hosts), hostp)
-		director_flush_host(conn->dir, conn->dir->self_host, *hostp);
+	array_foreach(mail_hosts_get(conn->dir->mail_hosts), hostp) {
+		director_flush_host(conn->dir, conn->dir->self_host,
+				    NULL, *hostp);
+	}
 	o_stream_send(conn->output, "OK\n", 3);
 }
 
@@ -146,7 +149,8 @@
 	if (host == NULL)
 		o_stream_send_str(conn->output, "NOTFOUND\n");
 	else {
-		director_flush_host(conn->dir, conn->dir->self_host, host);
+		director_flush_host(conn->dir, conn->dir->self_host,
+				    NULL, host);
 		o_stream_send(conn->output, "OK\n", 3);
 	}
 	return TRUE;
--- a/src/director/main.c	Thu Jun 24 16:27:20 2010 +0100
+++ b/src/director/main.c	Thu Jun 24 20:29:27 2010 +0100
@@ -34,7 +34,7 @@
 		return -1;
 	}
 
-	director_connection_init_in(director, fd);
+	director_connection_init_in(director, fd, ip);
 	return 0;
 }
 
@@ -166,7 +166,7 @@
 		&director_setting_parser_info,
 		NULL
 	};
-	unsigned int test_port;
+	unsigned int test_port = 0;
 	const char *error;
 	bool debug = FALSE;
 	int c;