Mercurial > dovecot > core-2.2
changeset 14571:42cca8a1d179
director: Implemented ability to remove directors from a running ring.
Also added doveadm command for adding a new director to a running ring.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Sat, 19 May 2012 21:18:04 +0300 |
parents | 40f958c7643b |
children | 8f72002cb394 |
files | src/director/director-connection.c src/director/director-host.c src/director/director-host.h src/director/director.c src/director/director.h src/director/doveadm-connection.c src/director/main.c src/doveadm/doveadm-director.c |
diffstat | 8 files changed, 326 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/src/director/director-connection.c Sat May 19 21:16:42 2012 +0300 +++ b/src/director/director-connection.c Sat May 19 21:18:04 2012 +0300 @@ -383,7 +383,12 @@ elsewhere with CONNECT. however, before disconnecting it verify first that our left side is actually still functional. */ + i_assert(conn->host == NULL); conn->host = director_host_get(dir, &ip, port); + /* the host shouldn't be removed at this point, but if for some + reason it is we don't want to crash */ + conn->host->removed = FALSE; + director_host_ref(conn->host); /* make sure we don't keep old sequence values across restarts */ conn->host->last_seq = 0; @@ -587,6 +592,10 @@ /* ignore updates to ourself */ return TRUE; } + if (host->removed) { + /* ignore re-adds of removed directors */ + return TRUE; + } /* already have this. just reset its last_network_failure timestamp, since it might be up now. */ @@ -598,18 +607,32 @@ } } else { /* save the director and forward it */ - director_host_add(conn->dir, &ip, port); + host = director_host_add(conn->dir, &ip, port); forward = TRUE; } if (forward) { - director_update_send(conn->dir, - director_connection_get_host(conn), - t_strdup_printf("DIRECTOR\t%s\t%u\n", - net_ip2addr(&ip), port)); + director_notify_ring_added(host, + director_connection_get_host(conn)); } return TRUE; } +static bool director_cmd_director_remove(struct director_connection *conn, + const char *const *args) +{ + struct director_host *host; + struct ip_addr ip; + unsigned int port; + + if (!director_args_parse_ip_port(conn, args, &ip, &port)) + return FALSE; + + host = director_host_lookup(conn->dir, &ip, port); + if (host != NULL && !host->removed) + director_ring_remove(host, director_connection_get_host(conn)); + return TRUE; +} + static bool director_cmd_host_hand_start(struct director_connection *conn, const char *const *args) @@ -659,7 +682,7 @@ *_args = args + 3; host = director_host_lookup(conn->dir, &ip, port); - if (host == NULL) { + if (host == NULL || host->removed) { /* director is already gone, but we can't be sure if this command was sent everywhere. re-send it as if it was from ourself. */ @@ -1191,6 +1214,8 @@ return director_cmd_user_killed_everywhere(conn, args); if (strcmp(cmd, "DIRECTOR") == 0) return director_cmd_director(conn, args); + if (strcmp(cmd, "DIRECTOR-REMOVE") == 0) + return director_cmd_director_remove(conn, args); if (strcmp(cmd, "SYNC") == 0) return director_connection_sync(conn, args); if (strcmp(cmd, "CONNECT") == 0) @@ -1279,6 +1304,8 @@ struct director_host *const *hostp; array_foreach(&conn->dir->dir_hosts, hostp) { + if ((*hostp)->removed) + continue; str_printfa(str, "DIRECTOR\t%s\t%u\n", net_ip2addr(&(*hostp)->ip), (*hostp)->port); } @@ -1433,12 +1460,15 @@ { struct director_connection *conn; + i_assert(!host->removed); + /* make sure we don't keep old sequence values across restarts */ host->last_seq = 0; conn = director_connection_init_common(dir, fd); conn->name = i_strdup_printf("%s/out", host->name); conn->host = host; + director_host_ref(host); conn->io = io_add(conn->fd, IO_WRITE, director_connection_connected, conn); return conn; @@ -1471,6 +1501,8 @@ } if (dir->right == conn) dir->right = NULL; + if (conn->host != NULL) + director_host_unref(conn->host); if (conn->user_iter != NULL) user_directory_iter_deinit(&conn->user_iter);
--- a/src/director/director-host.c Sat May 19 21:16:42 2012 +0300 +++ b/src/director/director-host.c Sat May 19 21:18:04 2012 +0300 @@ -29,6 +29,8 @@ struct director_host *host; host = i_new(struct director_host, 1); + host->dir = dir; + host->refcount = 1; host->ip = *ip; host->port = port; host->name = i_strdup_printf("%s:%u", net_ip2addr(ip), port); @@ -41,8 +43,39 @@ return host; } -void director_host_free(struct director_host *host) +void director_host_free(struct director_host **_host) +{ + struct director_host *host = *_host; + + i_assert(host->refcount == 1); + + *_host = NULL; + director_host_unref(host); +} + +void director_host_ref(struct director_host *host) { + i_assert(host->refcount > 0); + host->refcount++; +} + +void director_host_unref(struct director_host *host) +{ + struct director_host *const *hosts; + unsigned int i, count; + + i_assert(host->refcount > 0); + + if (--host->refcount > 0) + return; + + hosts = array_get(&host->dir->dir_hosts, &count); + for (i = 0; i < count; i++) { + if (hosts[i] == host) { + array_delete(&host->dir->dir_hosts, i, 1); + break; + } + } i_free(host->name); i_free(host); }
--- a/src/director/director-host.h Sat May 19 21:16:42 2012 +0300 +++ b/src/director/director-host.h Sat May 19 21:18:04 2012 +0300 @@ -6,6 +6,9 @@ struct director; struct director_host { + struct director *dir; + int refcount; + struct ip_addr ip; unsigned int port; @@ -22,12 +25,16 @@ time_t last_protocol_failure; /* we are this director */ unsigned int self:1; + unsigned int removed:1; }; struct director_host * director_host_add(struct director *dir, const struct ip_addr *ip, unsigned int port); -void director_host_free(struct director_host *host); +void director_host_free(struct director_host **host); + +void director_host_ref(struct director_host *host); +void director_host_unref(struct director_host *host); struct director_host * director_host_get(struct director *dir, const struct ip_addr *ip,
--- a/src/director/director.c Sat May 19 21:16:42 2012 +0300 +++ b/src/director/director.c Sat May 19 21:18:04 2012 +0300 @@ -20,6 +20,7 @@ #define DIRECTOR_SYNC_TIMEOUT_MSECS (5*1000) #define DIRECTOR_RING_MIN_WAIT_SECS 20 #define DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS 1000 +#define DIRECTOR_DELAYED_DIR_REMOVE_MSECS (1000*30) static bool director_is_self_ip_set(struct director *dir) { @@ -127,8 +128,8 @@ static struct director_host * director_get_preferred_right_host(struct director *dir) { - struct director_host *const *hosts; - unsigned int count, self_idx; + struct director_host *const *hosts, *host; + unsigned int i, count, self_idx; hosts = array_get(&dir->dir_hosts, &count); if (count == 1) { @@ -137,7 +138,13 @@ } self_idx = director_find_self_idx(dir); - return hosts[(self_idx + 1) % count]; + for (i = 0; i < count; i++) { + host = hosts[(self_idx + i + 1) % count]; + if (!host->removed) + return host; + } + /* self, with some removed hosts */ + return NULL; } static bool director_wait_for_others(struct director *dir) @@ -177,6 +184,9 @@ for (i = 1; i < count; i++) { unsigned int idx = (self_idx + i) % count; + if (hosts[idx]->removed) + continue; + if (hosts[idx]->last_network_failure + DIRECTOR_RECONNECT_RETRY_SECS > ioloop_time) { /* connection failed recently, don't try retrying here */ @@ -408,6 +418,79 @@ director_connection_uncork(*connp); } +void director_notify_ring_added(struct director_host *added_host, + struct director_host *src) +{ + const char *cmd; + + cmd = t_strdup_printf("DIRECTOR\t%s\t%u\n", + net_ip2addr(&added_host->ip), added_host->port); + director_update_send(added_host->dir, src, cmd); +} + +static void director_delayed_dir_remove_timeout(struct director *dir) +{ + struct director_host *const *hosts, *host; + unsigned int i, count; + + timeout_remove(&dir->to_remove_dirs); + + hosts = array_get(&dir->dir_hosts, &count); + for (i = 0; i < count; ) { + if (hosts[i]->removed) { + host = hosts[i]; + director_host_free(&host); + hosts = array_get(&dir->dir_hosts, &count); + } else { + i++; + } + } +} + +void director_ring_remove(struct director_host *removed_host, + struct director_host *src) +{ + struct director *dir = removed_host->dir; + struct director_connection *const *conns, *conn; + unsigned int i, count; + const char *cmd; + + if (removed_host->self) { + /* others will just disconnect us */ + return; + } + + /* mark the host as removed and fully remove it later. this delay is + needed, because the removal may trigger director reconnections, + which may send the director back and we don't want to re-add it */ + removed_host->removed = TRUE; + if (dir->to_remove_dirs == NULL) { + dir->to_remove_dirs = + timeout_add(DIRECTOR_DELAYED_DIR_REMOVE_MSECS, + director_delayed_dir_remove_timeout, dir); + } + + /* disconnect any connections to the host */ + conns = array_get(&dir->connections, &count); + for (i = 0; i < count; ) { + conn = conns[i]; + if (director_connection_get_host(conn) != removed_host) + i++; + else { + director_connection_deinit(&conn); + conns = array_get(&dir->connections, &count); + } + } + if (dir->right == NULL) + director_connect(dir); + + cmd = t_strdup_printf("DIRECTOR-REMOVE\t%s\t%u\n", + net_ip2addr(&removed_host->ip), + removed_host->port); + director_update_send_version(dir, src, + DIRECTOR_VERSION_RING_REMOVE, cmd); +} + void director_update_host(struct director *dir, struct director_host *src, struct director_host *orig_src, struct mail_host *host) @@ -704,12 +787,20 @@ void director_update_send(struct director *dir, struct director_host *src, const char *cmd) { + director_update_send_version(dir, src, 0, cmd); +} + +void director_update_send_version(struct director *dir, + struct director_host *src, + unsigned int min_version, const char *cmd) +{ struct director_connection *const *connp; i_assert(src != NULL); array_foreach(&dir->connections, connp) { - if (director_connection_get_host(*connp) != src) + if (director_connection_get_host(*connp) != src && + director_connection_get_minor_version(*connp) >= min_version) director_connection_send(*connp, cmd); } } @@ -741,7 +832,7 @@ void director_deinit(struct director **_dir) { struct director *dir = *_dir; - struct director_host *const *hostp; + struct director_host *const *hostp, *host; struct director_connection *conn, *const *connp; *_dir = NULL; @@ -765,8 +856,13 @@ timeout_remove(&dir->to_request); if (dir->to_sync != NULL) timeout_remove(&dir->to_sync); - array_foreach(&dir->dir_hosts, hostp) - director_host_free(*hostp); + if (dir->to_remove_dirs != NULL) + timeout_remove(&dir->to_remove_dirs); + while (array_count(&dir->dir_hosts) > 0) { + hostp = array_idx(&dir->dir_hosts, 0); + host = *hostp; + director_host_free(&host); + } array_free(&dir->pending_requests); array_free(&dir->dir_hosts); array_free(&dir->connections);
--- a/src/director/director.h Sat May 19 21:16:42 2012 +0300 +++ b/src/director/director.h Sat May 19 21:18:04 2012 +0300 @@ -6,10 +6,12 @@ #define DIRECTOR_VERSION_NAME "director" #define DIRECTOR_VERSION_MAJOR 1 -#define DIRECTOR_VERSION_MINOR 1 +#define DIRECTOR_VERSION_MINOR 2 /* weak users supported in protocol v1.1+ */ #define DIRECTOR_VERSION_WEAK_USERS 1 +/* director removes supported in v1.2+ */ +#define DIRECTOR_VERSION_RING_REMOVE 2 /* Minimum time between even attempting to communicate with a director that failed due to a protocol error. */ @@ -58,6 +60,7 @@ /* director hosts are sorted by IP (and port) */ ARRAY_DEFINE(dir_hosts, struct director_host *); + struct timeout *to_remove_dirs; struct ipc_client *ipc_proxy; unsigned int sync_seq; @@ -99,6 +102,11 @@ uint32_t seq, unsigned int minor_version); bool director_resend_sync(struct director *dir); +void director_notify_ring_added(struct director_host *added_host, + struct director_host *src); +void director_ring_remove(struct director_host *removed_host, + struct director_host *src); + void director_update_host(struct director *dir, struct director_host *src, struct director_host *orig_src, struct mail_host *host); @@ -129,7 +137,10 @@ /* Send data to all directors using both left and right connections (unless they're the same). */ void director_update_send(struct director *dir, struct director_host *src, - const char *data); + const char *cmd); +void director_update_send_version(struct director *dir, + struct director_host *src, + unsigned int min_version, const char *cmd); int director_connect_host(struct director *dir, struct director_host *host);
--- a/src/director/doveadm-connection.c Sat May 19 21:16:42 2012 +0300 +++ b/src/director/doveadm-connection.c Sat May 19 21:18:04 2012 +0300 @@ -109,7 +109,9 @@ right = dir->right != NULL && director_connection_get_host(dir->right) == host; - if (dir->self_host == host) + if (host->removed) + type = "removed"; + else if (dir->self_host == host) type = "self"; else if (left) type = right ? "l+r" : "left"; @@ -129,6 +131,58 @@ } static bool +doveadm_cmd_director_add(struct doveadm_connection *conn, const char *line) +{ + const char *const *args; + struct director_host *host; + struct ip_addr ip; + unsigned int port = conn->dir->self_port; + + args = t_strsplit_tab(line); + if (args[0] == NULL || + net_addr2ip(line, &ip) < 0 || + (args[1] != NULL && str_to_uint(args[1], &port) < 0)) { + i_error("doveadm sent invalid DIRECTOR-ADD parameters"); + return FALSE; + } + + if (director_host_lookup(conn->dir, &ip, port) == NULL) { + host = director_host_add(conn->dir, &ip, port); + director_notify_ring_added(host, conn->dir->self_host); + } + o_stream_send(conn->output, "OK\n", 3); + return TRUE; +} + +static bool +doveadm_cmd_director_remove(struct doveadm_connection *conn, const char *line) +{ + const char *const *args; + struct director_host *host; + struct ip_addr ip; + unsigned int port = 0; + + args = t_strsplit_tab(line); + if (args[0] == NULL || + net_addr2ip(line, &ip) < 0 || + (args[1] != NULL && str_to_uint(args[1], &port) < 0)) { + i_error("doveadm sent invalid DIRECTOR-REMOVE parameters"); + return FALSE; + } + + host = port != 0 ? + director_host_lookup(conn->dir, &ip, port) : + director_host_lookup_ip(conn->dir, &ip); + if (host == NULL) + o_stream_send_str(conn->output, "NOTFOUND\n"); + else { + director_ring_remove(host, conn->dir->self_host); + o_stream_send(conn->output, "OK\n", 3); + } + return TRUE; +} + +static bool doveadm_cmd_host_set(struct doveadm_connection *conn, const char *line) { struct director *dir = conn->dir; @@ -364,6 +418,10 @@ doveadm_cmd_host_list_removed(conn); else if (strcmp(cmd, "DIRECTOR-LIST") == 0) doveadm_cmd_director_list(conn); + else if (strcmp(cmd, "DIRECTOR-ADD") == 0) + doveadm_cmd_director_add(conn, args); + else if (strcmp(cmd, "DIRECTOR-REMOVE") == 0) + doveadm_cmd_director_remove(conn, args); else if (strcmp(cmd, "HOST-SET") == 0) ret = doveadm_cmd_host_set(conn, args); else if (strcmp(cmd, "HOST-REMOVE") == 0)
--- a/src/director/main.c Sat May 19 21:16:42 2012 +0300 +++ b/src/director/main.c Sat May 19 21:18:04 2012 +0300 @@ -28,7 +28,10 @@ static int director_client_connected(int fd, const struct ip_addr *ip) { - if (director_host_lookup_ip(director, ip) == NULL) { + struct director_host *host; + + host = director_host_lookup_ip(director, ip); + if (host == NULL || host->removed) { i_warning("Connection from %s: Server not listed in " "director_servers, dropping", net_ip2addr(ip)); return -1;
--- a/src/doveadm/doveadm-director.c Sat May 19 21:16:42 2012 +0300 +++ b/src/doveadm/doveadm-director.c Sat May 19 21:18:04 2012 +0300 @@ -3,6 +3,7 @@ #include "lib.h" #include "md5.h" #include "hash.h" +#include "str.h" #include "network.h" #include "istream.h" #include "write-full.h" @@ -580,6 +581,68 @@ director_disconnect(ctx); } + +static void director_read_ok_reply(struct director_context *ctx) +{ + const char *line; + + line = i_stream_read_next_line(ctx->input); + if (line == NULL) { + i_error("Director disconnected unexpectedly"); + doveadm_exit_code = EX_TEMPFAIL; + } else if (strcmp(line, "NOTFOUND") == 0) { + i_error("Not found"); + doveadm_exit_code = DOVEADM_EX_NOTFOUND; + } else if (strcmp(line, "OK") != 0) { + i_error("Failed: %s", line); + doveadm_exit_code = EX_TEMPFAIL; + } +} + +static void cmd_director_ring_add(int argc, char *argv[]) +{ + struct director_context *ctx; + struct ip_addr ip; + string_t *str = t_str_new(64); + unsigned int port = 0; + + ctx = cmd_director_init(argc, argv, "a:", cmd_director_ring_add); + if (argv[optind] == NULL || + net_addr2ip(argv[optind], &ip) < 0 || + (argv[optind+1] != NULL && str_to_uint(argv[optind+1], &port) < 0)) + director_cmd_help(cmd_director_ring_add); + + str_printfa(str, "DIRECTOR-ADD\t%s", net_ip2addr(&ip)); + if (port != 0) + str_printfa(str, "\t%u", port); + str_append_c(str, '\n'); + director_send(ctx, str_c(str)); + director_read_ok_reply(ctx); + director_disconnect(ctx); +} + +static void cmd_director_ring_remove(int argc, char *argv[]) +{ + struct director_context *ctx; + struct ip_addr ip; + string_t *str = t_str_new(64); + unsigned int port = 0; + + ctx = cmd_director_init(argc, argv, "a:", cmd_director_ring_remove); + if (argv[optind] == NULL || + net_addr2ip(argv[optind], &ip) < 0 || + (argv[optind+1] != NULL && str_to_uint(argv[optind+1], &port) < 0)) + director_cmd_help(cmd_director_ring_remove); + + str_printfa(str, "DIRECTOR-REMOVE\t%s", net_ip2addr(&ip)); + if (port != 0) + str_printfa(str, "\t%u", port); + str_append_c(str, '\n'); + director_send(ctx, str_c(str)); + director_read_ok_reply(ctx); + director_disconnect(ctx); +} + static void cmd_director_ring_status(int argc, char *argv[]) { struct director_context *ctx; @@ -634,6 +697,10 @@ "[-a <director socket path>] <host>|all" }, { cmd_director_dump, "director dump", "[-a <director socket path>]" }, + { cmd_director_ring_add, "director ring add", + "[-a <director socket path>] <ip> [<port>]" }, + { cmd_director_ring_remove, "director ring remove", + "[-a <director socket path>] <ip> [<port>]" }, { cmd_director_ring_status, "director ring status", "[-a <director socket path>]" } };