changeset 14571:42cca8a1d179

director: Implemented ability to remove directors from a running ring. Also added doveadm command for adding a new director to a running ring.
author Timo Sirainen <tss@iki.fi>
date Sat, 19 May 2012 21:18:04 +0300
parents 40f958c7643b
children 8f72002cb394
files src/director/director-connection.c src/director/director-host.c src/director/director-host.h src/director/director.c src/director/director.h src/director/doveadm-connection.c src/director/main.c src/doveadm/doveadm-director.c
diffstat 8 files changed, 326 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/src/director/director-connection.c	Sat May 19 21:16:42 2012 +0300
+++ b/src/director/director-connection.c	Sat May 19 21:18:04 2012 +0300
@@ -383,7 +383,12 @@
 	   elsewhere with CONNECT. however, before disconnecting it verify
 	   first that our left side is actually still functional.
 	*/
+	i_assert(conn->host == NULL);
 	conn->host = director_host_get(dir, &ip, port);
+	/* the host shouldn't be removed at this point, but if for some
+	   reason it is we don't want to crash */
+	conn->host->removed = FALSE;
+	director_host_ref(conn->host);
 	/* make sure we don't keep old sequence values across restarts */
 	conn->host->last_seq = 0;
 
@@ -587,6 +592,10 @@
 			/* ignore updates to ourself */
 			return TRUE;
 		}
+		if (host->removed) {
+			/* ignore re-adds of removed directors */
+			return TRUE;
+		}
 
 		/* already have this. just reset its last_network_failure
 		   timestamp, since it might be up now. */
@@ -598,18 +607,32 @@
 		}
 	} else {
 		/* save the director and forward it */
-		director_host_add(conn->dir, &ip, port);
+		host = director_host_add(conn->dir, &ip, port);
 		forward = TRUE;
 	}
 	if (forward) {
-		director_update_send(conn->dir,
-			director_connection_get_host(conn),
-			t_strdup_printf("DIRECTOR\t%s\t%u\n",
-					net_ip2addr(&ip), port));
+		director_notify_ring_added(host,
+			director_connection_get_host(conn));
 	}
 	return TRUE;
 }
 
+static bool director_cmd_director_remove(struct director_connection *conn,
+					 const char *const *args)
+{
+	struct director_host *host;
+	struct ip_addr ip;
+	unsigned int port;
+
+	if (!director_args_parse_ip_port(conn, args, &ip, &port))
+		return FALSE;
+
+	host = director_host_lookup(conn->dir, &ip, port);
+	if (host != NULL && !host->removed)
+		director_ring_remove(host, director_connection_get_host(conn));
+	return TRUE;
+}
+
 static bool
 director_cmd_host_hand_start(struct director_connection *conn,
 			     const char *const *args)
@@ -659,7 +682,7 @@
 	*_args = args + 3;
 
 	host = director_host_lookup(conn->dir, &ip, port);
-	if (host == NULL) {
+	if (host == NULL || host->removed) {
 		/* director is already gone, but we can't be sure if this
 		   command was sent everywhere. re-send it as if it was from
 		   ourself. */
@@ -1191,6 +1214,8 @@
 		return director_cmd_user_killed_everywhere(conn, args);
 	if (strcmp(cmd, "DIRECTOR") == 0)
 		return director_cmd_director(conn, args);
+	if (strcmp(cmd, "DIRECTOR-REMOVE") == 0)
+		return director_cmd_director_remove(conn, args);
 	if (strcmp(cmd, "SYNC") == 0)
 		return director_connection_sync(conn, args);
 	if (strcmp(cmd, "CONNECT") == 0)
@@ -1279,6 +1304,8 @@
 	struct director_host *const *hostp;
 
 	array_foreach(&conn->dir->dir_hosts, hostp) {
+		if ((*hostp)->removed)
+			continue;
 		str_printfa(str, "DIRECTOR\t%s\t%u\n",
 			    net_ip2addr(&(*hostp)->ip), (*hostp)->port);
 	}
@@ -1433,12 +1460,15 @@
 {
 	struct director_connection *conn;
 
+	i_assert(!host->removed);
+
 	/* make sure we don't keep old sequence values across restarts */
 	host->last_seq = 0;
 
 	conn = director_connection_init_common(dir, fd);
 	conn->name = i_strdup_printf("%s/out", host->name);
 	conn->host = host;
+	director_host_ref(host);
 	conn->io = io_add(conn->fd, IO_WRITE,
 			  director_connection_connected, conn);
 	return conn;
@@ -1471,6 +1501,8 @@
 	}
 	if (dir->right == conn)
 		dir->right = NULL;
+	if (conn->host != NULL)
+		director_host_unref(conn->host);
 
 	if (conn->user_iter != NULL)
 		user_directory_iter_deinit(&conn->user_iter);
--- a/src/director/director-host.c	Sat May 19 21:16:42 2012 +0300
+++ b/src/director/director-host.c	Sat May 19 21:18:04 2012 +0300
@@ -29,6 +29,8 @@
 	struct director_host *host;
 
 	host = i_new(struct director_host, 1);
+	host->dir = dir;
+	host->refcount = 1;
 	host->ip = *ip;
 	host->port = port;
 	host->name = i_strdup_printf("%s:%u", net_ip2addr(ip), port);
@@ -41,8 +43,39 @@
 	return host;
 }
 
-void director_host_free(struct director_host *host)
+void director_host_free(struct director_host **_host)
+{
+	struct director_host *host = *_host;
+
+	i_assert(host->refcount == 1);
+
+	*_host = NULL;
+	director_host_unref(host);
+}
+
+void director_host_ref(struct director_host *host)
 {
+	i_assert(host->refcount > 0);
+	host->refcount++;
+}
+
+void director_host_unref(struct director_host *host)
+{
+	struct director_host *const *hosts;
+	unsigned int i, count;
+
+	i_assert(host->refcount > 0);
+
+	if (--host->refcount > 0)
+		return;
+
+	hosts = array_get(&host->dir->dir_hosts, &count);
+	for (i = 0; i < count; i++) {
+		if (hosts[i] == host) {
+			array_delete(&host->dir->dir_hosts, i, 1);
+			break;
+		}
+	}
 	i_free(host->name);
 	i_free(host);
 }
--- a/src/director/director-host.h	Sat May 19 21:16:42 2012 +0300
+++ b/src/director/director-host.h	Sat May 19 21:18:04 2012 +0300
@@ -6,6 +6,9 @@
 struct director;
 
 struct director_host {
+	struct director *dir;
+	int refcount;
+
 	struct ip_addr ip;
 	unsigned int port;
 
@@ -22,12 +25,16 @@
 	time_t last_protocol_failure;
 	/* we are this director */
 	unsigned int self:1;
+	unsigned int removed:1;
 };
 
 struct director_host *
 director_host_add(struct director *dir, const struct ip_addr *ip,
 		  unsigned int port);
-void director_host_free(struct director_host *host);
+void director_host_free(struct director_host **host);
+
+void director_host_ref(struct director_host *host);
+void director_host_unref(struct director_host *host);
 
 struct director_host *
 director_host_get(struct director *dir, const struct ip_addr *ip,
--- a/src/director/director.c	Sat May 19 21:16:42 2012 +0300
+++ b/src/director/director.c	Sat May 19 21:18:04 2012 +0300
@@ -20,6 +20,7 @@
 #define DIRECTOR_SYNC_TIMEOUT_MSECS (5*1000)
 #define DIRECTOR_RING_MIN_WAIT_SECS 20
 #define DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS 1000
+#define DIRECTOR_DELAYED_DIR_REMOVE_MSECS (1000*30)
 
 static bool director_is_self_ip_set(struct director *dir)
 {
@@ -127,8 +128,8 @@
 static struct director_host *
 director_get_preferred_right_host(struct director *dir)
 {
-	struct director_host *const *hosts;
-	unsigned int count, self_idx;
+	struct director_host *const *hosts, *host;
+	unsigned int i, count, self_idx;
 
 	hosts = array_get(&dir->dir_hosts, &count);
 	if (count == 1) {
@@ -137,7 +138,13 @@
 	}
 
 	self_idx = director_find_self_idx(dir);
-	return hosts[(self_idx + 1) % count];
+	for (i = 0; i < count; i++) {
+		host = hosts[(self_idx + i + 1) % count];
+		if (!host->removed)
+			return host;
+	}
+	/* self, with some removed hosts */
+	return NULL;
 }
 
 static bool director_wait_for_others(struct director *dir)
@@ -177,6 +184,9 @@
 	for (i = 1; i < count; i++) {
 		unsigned int idx = (self_idx + i) % count;
 
+		if (hosts[idx]->removed)
+			continue;
+
 		if (hosts[idx]->last_network_failure +
 		    DIRECTOR_RECONNECT_RETRY_SECS > ioloop_time) {
 			/* connection failed recently, don't try retrying here */
@@ -408,6 +418,79 @@
 		director_connection_uncork(*connp);
 }
 
+void director_notify_ring_added(struct director_host *added_host,
+				struct director_host *src)
+{
+	const char *cmd;
+
+	cmd = t_strdup_printf("DIRECTOR\t%s\t%u\n",
+			      net_ip2addr(&added_host->ip), added_host->port);
+	director_update_send(added_host->dir, src, cmd);
+}
+
+static void director_delayed_dir_remove_timeout(struct director *dir)
+{
+	struct director_host *const *hosts, *host;
+	unsigned int i, count;
+
+	timeout_remove(&dir->to_remove_dirs);
+
+	hosts = array_get(&dir->dir_hosts, &count);
+	for (i = 0; i < count; ) {
+		if (hosts[i]->removed) {
+			host = hosts[i];
+			director_host_free(&host);
+			hosts = array_get(&dir->dir_hosts, &count);
+		} else {
+			i++;
+		}
+	}
+}
+
+void director_ring_remove(struct director_host *removed_host,
+			  struct director_host *src)
+{
+	struct director *dir = removed_host->dir;
+	struct director_connection *const *conns, *conn;
+	unsigned int i, count;
+	const char *cmd;
+
+	if (removed_host->self) {
+		/* others will just disconnect us */
+		return;
+	}
+
+	/* mark the host as removed and fully remove it later. this delay is
+	   needed, because the removal may trigger director reconnections,
+	   which may send the director back and we don't want to re-add it */
+	removed_host->removed = TRUE;
+	if (dir->to_remove_dirs == NULL) {
+		dir->to_remove_dirs =
+			timeout_add(DIRECTOR_DELAYED_DIR_REMOVE_MSECS,
+				    director_delayed_dir_remove_timeout, dir);
+	}
+
+	/* disconnect any connections to the host */
+	conns = array_get(&dir->connections, &count);
+	for (i = 0; i < count; ) {
+		conn = conns[i];
+		if (director_connection_get_host(conn) != removed_host)
+			i++;
+		else {
+			director_connection_deinit(&conn);
+			conns = array_get(&dir->connections, &count);
+		}
+	}
+	if (dir->right == NULL)
+		director_connect(dir);
+
+	cmd = t_strdup_printf("DIRECTOR-REMOVE\t%s\t%u\n",
+			      net_ip2addr(&removed_host->ip),
+			      removed_host->port);
+	director_update_send_version(dir, src,
+				     DIRECTOR_VERSION_RING_REMOVE, cmd);
+}
+
 void director_update_host(struct director *dir, struct director_host *src,
 			  struct director_host *orig_src,
 			  struct mail_host *host)
@@ -704,12 +787,20 @@
 void director_update_send(struct director *dir, struct director_host *src,
 			  const char *cmd)
 {
+	director_update_send_version(dir, src, 0, cmd);
+}
+
+void director_update_send_version(struct director *dir,
+				  struct director_host *src,
+				  unsigned int min_version, const char *cmd)
+{
 	struct director_connection *const *connp;
 
 	i_assert(src != NULL);
 
 	array_foreach(&dir->connections, connp) {
-		if (director_connection_get_host(*connp) != src)
+		if (director_connection_get_host(*connp) != src &&
+		    director_connection_get_minor_version(*connp) >= min_version)
 			director_connection_send(*connp, cmd);
 	}
 }
@@ -741,7 +832,7 @@
 void director_deinit(struct director **_dir)
 {
 	struct director *dir = *_dir;
-	struct director_host *const *hostp;
+	struct director_host *const *hostp, *host;
 	struct director_connection *conn, *const *connp;
 
 	*_dir = NULL;
@@ -765,8 +856,13 @@
 		timeout_remove(&dir->to_request);
 	if (dir->to_sync != NULL)
 		timeout_remove(&dir->to_sync);
-	array_foreach(&dir->dir_hosts, hostp)
-		director_host_free(*hostp);
+	if (dir->to_remove_dirs != NULL)
+		timeout_remove(&dir->to_remove_dirs);
+	while (array_count(&dir->dir_hosts) > 0) {
+		hostp = array_idx(&dir->dir_hosts, 0);
+		host = *hostp;
+		director_host_free(&host);
+	}
 	array_free(&dir->pending_requests);
 	array_free(&dir->dir_hosts);
 	array_free(&dir->connections);
--- a/src/director/director.h	Sat May 19 21:16:42 2012 +0300
+++ b/src/director/director.h	Sat May 19 21:18:04 2012 +0300
@@ -6,10 +6,12 @@
 
 #define DIRECTOR_VERSION_NAME "director"
 #define DIRECTOR_VERSION_MAJOR 1
-#define DIRECTOR_VERSION_MINOR 1
+#define DIRECTOR_VERSION_MINOR 2
 
 /* weak users supported in protocol v1.1+ */
 #define DIRECTOR_VERSION_WEAK_USERS 1
+/* director removes supported in v1.2+ */
+#define DIRECTOR_VERSION_RING_REMOVE 2
 
 /* Minimum time between even attempting to communicate with a director that
    failed due to a protocol error. */
@@ -58,6 +60,7 @@
 
 	/* director hosts are sorted by IP (and port) */
 	ARRAY_DEFINE(dir_hosts, struct director_host *);
+	struct timeout *to_remove_dirs;
 
 	struct ipc_client *ipc_proxy;
 	unsigned int sync_seq;
@@ -99,6 +102,11 @@
 			uint32_t seq, unsigned int minor_version);
 bool director_resend_sync(struct director *dir);
 
+void director_notify_ring_added(struct director_host *added_host,
+				struct director_host *src);
+void director_ring_remove(struct director_host *removed_host,
+			  struct director_host *src);
+
 void director_update_host(struct director *dir, struct director_host *src,
 			  struct director_host *orig_src,
 			  struct mail_host *host);
@@ -129,7 +137,10 @@
 /* Send data to all directors using both left and right connections
    (unless they're the same). */
 void director_update_send(struct director *dir, struct director_host *src,
-			  const char *data);
+			  const char *cmd);
+void director_update_send_version(struct director *dir,
+				  struct director_host *src,
+				  unsigned int min_version, const char *cmd);
 
 int director_connect_host(struct director *dir, struct director_host *host);
 
--- a/src/director/doveadm-connection.c	Sat May 19 21:16:42 2012 +0300
+++ b/src/director/doveadm-connection.c	Sat May 19 21:18:04 2012 +0300
@@ -109,7 +109,9 @@
 		right = dir->right != NULL &&
 			 director_connection_get_host(dir->right) == host;
 
-		if (dir->self_host == host)
+		if (host->removed)
+			type = "removed";
+		else if (dir->self_host == host)
 			type = "self";
 		else if (left)
 			type = right ? "l+r" : "left";
@@ -129,6 +131,58 @@
 }
 
 static bool
+doveadm_cmd_director_add(struct doveadm_connection *conn, const char *line)
+{
+	const char *const *args;
+	struct director_host *host;
+	struct ip_addr ip;
+	unsigned int port = conn->dir->self_port;
+
+	args = t_strsplit_tab(line);
+	if (args[0] == NULL ||
+	    net_addr2ip(line, &ip) < 0 ||
+	    (args[1] != NULL && str_to_uint(args[1], &port) < 0)) {
+		i_error("doveadm sent invalid DIRECTOR-ADD parameters");
+		return FALSE;
+	}
+
+	if (director_host_lookup(conn->dir, &ip, port) == NULL) {
+		host = director_host_add(conn->dir, &ip, port);
+		director_notify_ring_added(host, conn->dir->self_host);
+	}
+	o_stream_send(conn->output, "OK\n", 3);
+	return TRUE;
+}
+
+static bool
+doveadm_cmd_director_remove(struct doveadm_connection *conn, const char *line)
+{
+	const char *const *args;
+	struct director_host *host;
+	struct ip_addr ip;
+	unsigned int port = 0;
+
+	args = t_strsplit_tab(line);
+	if (args[0] == NULL ||
+	    net_addr2ip(line, &ip) < 0 ||
+	    (args[1] != NULL && str_to_uint(args[1], &port) < 0)) {
+		i_error("doveadm sent invalid DIRECTOR-REMOVE parameters");
+		return FALSE;
+	}
+
+	host = port != 0 ?
+		director_host_lookup(conn->dir, &ip, port) :
+		director_host_lookup_ip(conn->dir, &ip);
+	if (host == NULL)
+		o_stream_send_str(conn->output, "NOTFOUND\n");
+	else {
+		director_ring_remove(host, conn->dir->self_host);
+		o_stream_send(conn->output, "OK\n", 3);
+	}
+	return TRUE;
+}
+
+static bool
 doveadm_cmd_host_set(struct doveadm_connection *conn, const char *line)
 {
 	struct director *dir = conn->dir;
@@ -364,6 +418,10 @@
 			doveadm_cmd_host_list_removed(conn);
 		else if (strcmp(cmd, "DIRECTOR-LIST") == 0)
 			doveadm_cmd_director_list(conn);
+		else if (strcmp(cmd, "DIRECTOR-ADD") == 0)
+			doveadm_cmd_director_add(conn, args);
+		else if (strcmp(cmd, "DIRECTOR-REMOVE") == 0)
+			doveadm_cmd_director_remove(conn, args);
 		else if (strcmp(cmd, "HOST-SET") == 0)
 			ret = doveadm_cmd_host_set(conn, args);
 		else if (strcmp(cmd, "HOST-REMOVE") == 0)
--- a/src/director/main.c	Sat May 19 21:16:42 2012 +0300
+++ b/src/director/main.c	Sat May 19 21:18:04 2012 +0300
@@ -28,7 +28,10 @@
 
 static int director_client_connected(int fd, const struct ip_addr *ip)
 {
-	if (director_host_lookup_ip(director, ip) == NULL) {
+	struct director_host *host;
+
+	host = director_host_lookup_ip(director, ip);
+	if (host == NULL || host->removed) {
 		i_warning("Connection from %s: Server not listed in "
 			  "director_servers, dropping", net_ip2addr(ip));
 		return -1;
--- a/src/doveadm/doveadm-director.c	Sat May 19 21:16:42 2012 +0300
+++ b/src/doveadm/doveadm-director.c	Sat May 19 21:18:04 2012 +0300
@@ -3,6 +3,7 @@
 #include "lib.h"
 #include "md5.h"
 #include "hash.h"
+#include "str.h"
 #include "network.h"
 #include "istream.h"
 #include "write-full.h"
@@ -580,6 +581,68 @@
 	director_disconnect(ctx);
 }
 
+
+static void director_read_ok_reply(struct director_context *ctx)
+{
+	const char *line;
+
+	line = i_stream_read_next_line(ctx->input);
+	if (line == NULL) {
+		i_error("Director disconnected unexpectedly");
+		doveadm_exit_code = EX_TEMPFAIL;
+	} else if (strcmp(line, "NOTFOUND") == 0) {
+		i_error("Not found");
+		doveadm_exit_code = DOVEADM_EX_NOTFOUND;
+	} else if (strcmp(line, "OK") != 0) {
+		i_error("Failed: %s", line);
+		doveadm_exit_code = EX_TEMPFAIL;
+	}
+}
+
+static void cmd_director_ring_add(int argc, char *argv[])
+{
+	struct director_context *ctx;
+	struct ip_addr ip;
+	string_t *str = t_str_new(64);
+	unsigned int port = 0;
+
+	ctx = cmd_director_init(argc, argv, "a:", cmd_director_ring_add);
+	if (argv[optind] == NULL ||
+	    net_addr2ip(argv[optind], &ip) < 0 ||
+	    (argv[optind+1] != NULL && str_to_uint(argv[optind+1], &port) < 0))
+		director_cmd_help(cmd_director_ring_add);
+
+	str_printfa(str, "DIRECTOR-ADD\t%s", net_ip2addr(&ip));
+	if (port != 0)
+		str_printfa(str, "\t%u", port);
+	str_append_c(str, '\n');
+	director_send(ctx, str_c(str));
+	director_read_ok_reply(ctx);
+	director_disconnect(ctx);
+}
+
+static void cmd_director_ring_remove(int argc, char *argv[])
+{
+	struct director_context *ctx;
+	struct ip_addr ip;
+	string_t *str = t_str_new(64);
+	unsigned int port = 0;
+
+	ctx = cmd_director_init(argc, argv, "a:", cmd_director_ring_remove);
+	if (argv[optind] == NULL ||
+	    net_addr2ip(argv[optind], &ip) < 0 ||
+	    (argv[optind+1] != NULL && str_to_uint(argv[optind+1], &port) < 0))
+		director_cmd_help(cmd_director_ring_remove);
+
+	str_printfa(str, "DIRECTOR-REMOVE\t%s", net_ip2addr(&ip));
+	if (port != 0)
+		str_printfa(str, "\t%u", port);
+	str_append_c(str, '\n');
+	director_send(ctx, str_c(str));
+	director_read_ok_reply(ctx);
+	director_disconnect(ctx);
+}
+
 static void cmd_director_ring_status(int argc, char *argv[])
 {
 	struct director_context *ctx;
@@ -634,6 +697,10 @@
 	  "[-a <director socket path>] <host>|all" },
 	{ cmd_director_dump, "director dump",
 	  "[-a <director socket path>]" },
+	{ cmd_director_ring_add, "director ring add",
+	  "[-a <director socket path>] <ip> [<port>]" },
+	{ cmd_director_ring_remove, "director ring remove",
+	  "[-a <director socket path>] <ip> [<port>]" },
 	{ cmd_director_ring_status, "director ring status",
 	  "[-a <director socket path>]" }
 };