Mercurial > dovecot > core-2.2
changeset 11629:a07aa85f68c9 HEAD
director: Lots of fixes. It should be pretty stable now.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Thu, 24 Jun 2010 20:29:27 +0100 |
parents | 7885030184ab |
children | c9a62b0d9d36 |
files | src/director/auth-connection.c src/director/auth-connection.h src/director/director-connection.c src/director/director-connection.h src/director/director-host.h src/director/director-request.c src/director/director-test.c src/director/director-test.sh src/director/director.c src/director/director.h src/director/doveadm-connection.c src/director/main.c |
diffstat | 12 files changed, 563 insertions(+), 147 deletions(-) [+] |
line wrap: on
line diff
--- a/src/director/auth-connection.c Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/auth-connection.c Thu Jun 24 20:29:27 2010 +0100 @@ -27,6 +27,8 @@ static struct auth_connection *auth_connections; +static void auth_connection_disconnected(struct auth_connection **conn); + static void auth_connection_input(struct auth_connection *conn) { char *line; @@ -36,13 +38,13 @@ return; case -1: /* disconnected */ - auth_connection_deinit(&conn); + auth_connection_disconnected(&conn); return; case -2: /* buffer full */ i_error("BUG: Auth server sent us more than %d bytes", (int)AUTH_CLIENT_MAX_LINE_LENGTH); - auth_connection_deinit(&conn); + auth_connection_disconnected(&conn); return; } @@ -103,12 +105,20 @@ if (close(conn->fd) < 0) i_error("close(auth connection) failed: %m"); - conn->callback(NULL, conn->context); } i_free(conn->path); i_free(conn); } +static void auth_connection_disconnected(struct auth_connection **_conn) +{ + struct auth_connection *conn = *_conn; + + *_conn = NULL; + /* notify callback. it should deinit this connection */ + conn->callback(NULL, conn->context); +} + void auth_connection_send(struct auth_connection *conn, const void *data, size_t size) { @@ -122,6 +132,6 @@ while (auth_connections != NULL) { struct auth_connection *conn = auth_connections; - auth_connection_deinit(&conn); + auth_connection_disconnected(&conn); } }
--- a/src/director/auth-connection.h Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/auth-connection.h Thu Jun 24 20:29:27 2010 +0100 @@ -2,7 +2,7 @@ #define AUTH_CONNECTION_H /* Called for each input line. This is also called with line=NULL if - connection gets disonnected. */ + connection gets disconnected. */ typedef void auth_input_callback(const char *line, void *context); struct auth_connection *auth_connection_init(const char *path);
--- a/src/director/director-connection.c Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/director-connection.c Thu Jun 24 20:29:27 2010 +0100 @@ -7,6 +7,7 @@ #include "istream.h" #include "ostream.h" #include "str.h" +#include "llist.h" #include "master-service.h" #include "mail-host.h" #include "director.h" @@ -25,14 +26,20 @@ #define MAX_INBUF_SIZE 1024 #define MAX_OUTBUF_SIZE (1024*1024*10) #define OUTBUF_FLUSH_THRESHOLD (1024*128) +/* Max idling time while connecting/handshaking before disconnecting */ +#define DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS (2*1000) /* How long to wait for PONG after PING request */ #define DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS (2*1000) /* How long to wait to send PING when connection is idle */ #define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000) +/* How long to wait before sending PING while waiting for SYNC reply */ +#define DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS 1000 struct director_connection { + struct director_connection *prev, *next; + struct director *dir; - const char *name; + char *name; /* for incoming connections the director host isn't known until ME-line is received */ @@ -54,9 +61,11 @@ unsigned int ignore_host_events:1; unsigned int handshake_sending_hosts:1; unsigned int ping_waiting:1; + unsigned int sync_ping:1; }; static void director_connection_ping(struct director_connection *conn); +static void director_connection_disconnected(struct director_connection **conn); static bool director_args_parse_ip_port(struct director_connection *conn, @@ -102,7 +111,12 @@ if (!conn->in) return TRUE; + i_free(conn->name); + conn->name = i_strdup_printf("%s/left", host->name); conn->host = host; + /* make sure we don't keep old sequence values across restarts */ + host->last_seq = 0; + connect_str = t_strdup_printf("CONNECT\t%s\t%u\n", net_ip2addr(&host->ip), host->port); /* make sure this is the correct incoming connection */ @@ -135,6 +149,8 @@ one, but before that tell it to connect to the new one. that message might not reach it, so also send the same message to right side. */ + i_warning("Replacing director connection %s with %s", + dir->left->host->name, host->name); director_connection_send(dir->left, connect_str); (void)o_stream_flush(dir->left->output); director_connection_deinit(&dir->left); @@ -227,6 +243,33 @@ return TRUE; } +static bool +director_cmd_user(struct director_connection *conn, const char *const *args) +{ + unsigned int username_hash; + struct ip_addr ip; + struct mail_host *host; + struct user *user; + + if (str_array_length(args) != 2 || + str_to_uint(args[0], &username_hash) < 0 || + net_addr2ip(args[1], &ip) < 0) { + i_error("director(%s): Invalid USER args", conn->name); + return FALSE; + } + + host = mail_host_lookup(conn->dir->mail_hosts, &ip); + if (host == NULL) { + /* we probably just removed this host. */ + return TRUE; + } + + if (director_user_refresh(conn->dir, username_hash, + host, ioloop_time, &user)) + director_update_user(conn->dir, conn->host, user); + return TRUE; +} + static bool director_cmd_director(struct director_connection *conn, const char *const *args) { @@ -269,7 +312,7 @@ hosts = mail_hosts_get(conn->dir->mail_hosts); while (array_count(hosts) > 0) { hostp = array_idx(hosts, 0); - director_remove_host(conn->dir, conn->host, *hostp); + director_remove_host(conn->dir, NULL, NULL, *hostp); } } else if (!remote_ring_completed && conn->dir->ring_handshaked) { /* ignore whatever remote sends */ @@ -279,8 +322,46 @@ return TRUE; } +static int +director_cmd_is_seen(struct director_connection *conn, + const char *const **_args, + struct director_host **host_r) +{ + const char *const *args = *_args; + struct ip_addr ip; + unsigned int port, seq; + struct director_host *host; + + if (str_array_length(args) < 3 || + net_addr2ip(args[0], &ip) < 0 || + str_to_uint(args[1], &port) < 0 || + str_to_uint(args[2], &seq) < 0) { + i_error("director(%s): Command is missing parameters", + conn->name); + return -1; + } + *_args = args + 3; + + host = director_host_lookup(conn->dir, &ip, port); + if (host == NULL) { + /* director is already gone, but we can't be sure if this + command was sent everywhere. re-send it as if it was from + ourself. */ + *host_r = NULL; + } else { + if (seq <= host->last_seq) { + /* already seen this */ + return 1; + } + *host_r = host; + host->last_seq = seq; + } + return 0; +} + static bool -director_cmd_host(struct director_connection *conn, const char *const *args) +director_cmd_host_int(struct director_connection *conn, const char *const *args, + struct director_host *dir_host) { struct mail_host *host; struct ip_addr ip; @@ -311,17 +392,40 @@ if (update) { mail_host_set_vhost_count(conn->dir->mail_hosts, host, vhost_count); - director_update_host(conn->dir, conn->host, host); + director_update_host(conn->dir, conn->host, dir_host, host); } return TRUE; } static bool +director_cmd_host_handshake(struct director_connection *conn, + const char *const *args) +{ + return director_cmd_host_int(conn, args, NULL); +} + +static bool +director_cmd_host(struct director_connection *conn, const char *const *args) +{ + struct director_host *dir_host; + int ret; + + if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0) + return ret > 0; + return director_cmd_host_int(conn, args, dir_host); +} + +static bool director_cmd_host_remove(struct director_connection *conn, const char *const *args) { + struct director_host *dir_host; struct mail_host *host; struct ip_addr ip; + int ret; + + if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0) + return ret > 0; if (str_array_length(args) != 1 || net_addr2ip(args[0], &ip) < 0) { @@ -331,7 +435,7 @@ host = mail_host_lookup(conn->dir->mail_hosts, &ip); if (host != NULL) - director_remove_host(conn->dir, conn->host, host); + director_remove_host(conn->dir, conn->host, dir_host, host); return TRUE; } @@ -339,18 +443,25 @@ director_cmd_host_flush(struct director_connection *conn, const char *const *args) { + struct director_host *dir_host; struct mail_host *host; struct ip_addr ip; + unsigned int seq; + int ret; - if (str_array_length(args) != 1 || - net_addr2ip(args[0], &ip) < 0) { + if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0) + return ret > 0; + + if (str_array_length(args) != 2 || + net_addr2ip(args[0], &ip) < 0 || + str_to_uint(args[1], &seq) < 0) { i_error("director(%s): Invalid HOST-FLUSH args", conn->name); return FALSE; } host = mail_host_lookup(conn->dir->mail_hosts, &ip); if (host != NULL) - director_flush_host(conn->dir, conn->host, host); + director_flush_host(conn->dir, conn->host, dir_host, host); return TRUE; } @@ -380,16 +491,16 @@ dir->left->handshake_received && dir->right->handshake_received) { /* we're connected to both directors. see if the ring is finished by sending a SYNC. if we get it back, it's done. */ - dir->sync_seq = ++dir->self_host->last_seq; + dir->sync_seq++; director_connection_send(dir->right, t_strdup_printf("SYNC\t%s\t%u\t%u\n", net_ip2addr(&dir->self_ip), dir->self_port, dir->sync_seq)); } - if (conn->to_ping == NULL) { - conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS, - director_connection_ping, conn); - } + if (conn->to_ping != NULL) + timeout_remove(&conn->to_ping); + conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS, + director_connection_ping, conn); } static bool @@ -444,7 +555,10 @@ if (strcmp(cmd, "HOST") == 0) { /* allow hosts from all connections always, this could be an host update */ - return director_cmd_host(conn, args); + if (conn->handshake_sending_hosts) + return director_cmd_host_handshake(conn, args); + else + return director_cmd_host(conn, args); } if (conn->handshake_sending_hosts && strcmp(cmd, "HOST-HAND-END") == 0) { @@ -456,47 +570,27 @@ conn->me_received) return director_cmd_host_hand_start(conn, args); - /* only incoming connections get a USER list */ - if (conn->in && strcmp(cmd, "USER") == 0 && conn->me_received) - return director_handshake_cmd_user(conn, args); + /* only incoming connections get a full USER list, but outgoing + connections can also receive USER updates during handshake and + it wouldn't be safe to ignore them. */ + if (strcmp(cmd, "USER") == 0 && conn->me_received) { + if (conn->in) + return director_handshake_cmd_user(conn, args); + else + return director_cmd_user(conn, args); + } /* both get DONE */ if (strcmp(cmd, "DONE") == 0 && !conn->handshake_received && !conn->handshake_sending_hosts) { director_handshake_cmd_done(conn); return TRUE; } - i_error("director(%s): Invalid handshake command: %s", - conn->name, cmd); + i_error("director(%s): Invalid handshake command: %s " + "(in=%d me_received=%d)", conn->name, cmd, + conn->in, conn->me_received); return FALSE; } -static bool -director_cmd_user(struct director_connection *conn, const char *const *args) -{ - unsigned int username_hash; - struct ip_addr ip; - struct mail_host *host; - struct user *user; - - if (str_array_length(args) != 2 || - str_to_uint(args[0], &username_hash) < 0 || - net_addr2ip(args[1], &ip) < 0) { - i_error("director(%s): Invalid USER args", conn->name); - return FALSE; - } - - host = mail_host_lookup(conn->dir->mail_hosts, &ip); - if (host == NULL) { - /* we probably just removed this host. */ - return TRUE; - } - - if (director_user_refresh(conn->dir, username_hash, - host, ioloop_time, &user)) - director_update_user(conn->dir, conn->host, user); - return TRUE; -} - static bool director_connection_sync(struct director_connection *conn, const char *const *args, const char *line) { @@ -523,24 +617,21 @@ /* stale SYNC event */ return TRUE; } + if (!dir->ring_handshaked) { /* the ring is handshaked */ director_set_ring_handshaked(dir); - return TRUE; - } - - if (dir->ring_synced) { + } else if (dir->ring_synced) { i_error("Received SYNC from %s (seq=%u) " "while already synced", conn->name, seq); return TRUE; + } else { + if (dir->debug) { + i_debug("Ring is synced (%s sent seq=%u)", + conn->name, seq); + } + director_set_ring_synced(dir); } - - if (dir->debug) { - i_debug("Ring is synced (%s sent seq=%u)", - conn->name, seq); - } - dir->ring_synced = TRUE; - director_set_state_changed(dir); return TRUE; } @@ -597,10 +688,7 @@ } } - /* connect here, disconnect old one */ - if (dir->right != NULL) - director_connection_deinit(&dir->right); - + /* connect here */ (void)director_connect_host(dir, host); return TRUE; } @@ -629,6 +717,15 @@ i_error("director(%s): Received empty line", conn->name); return FALSE; } + + /* ping/pong is always handled */ + if (strcmp(cmd, "PING") == 0) { + director_connection_send(conn, "PONG\n"); + return TRUE; + } + if (strcmp(cmd, "PONG") == 0) + return director_cmd_pong(conn); + if (!conn->handshake_received) { if (!director_connection_handle_handshake(conn, cmd, args)) { /* invalid commands during handshake, @@ -655,12 +752,6 @@ if (strcmp(cmd, "CONNECT") == 0) return director_cmd_connect(conn, args); - if (strcmp(cmd, "PING") == 0) { - director_connection_send(conn, "PONG\n"); - return TRUE; - } - if (strcmp(cmd, "PONG") == 0) - return director_cmd_pong(conn); i_error("director(%s): Unknown command (in this state): %s", conn->name, cmd); return FALSE; @@ -668,6 +759,7 @@ static void director_connection_input(struct director_connection *conn) { + struct director *dir = conn->dir; char *line; bool ret; @@ -678,26 +770,31 @@ return; case -1: /* disconnected */ - director_connection_deinit(&conn); + i_error("Director %s disconnected%s", conn->name, + conn->handshake_received ? "" : + " before handshake finished"); + director_connection_disconnected(&conn); return; case -2: /* buffer full */ - i_error("BUG: Director sent us more than %d bytes", - MAX_INBUF_SIZE); - director_connection_deinit(&conn); + i_error("BUG: Director %s sent us more than %d bytes", + conn->name, MAX_INBUF_SIZE); + director_connection_disconnected(&conn); return; } + director_sync_freeze(dir); while ((line = i_stream_next_line(conn->input)) != NULL) { T_BEGIN { ret = director_connection_handle_line(conn, line); } T_END; if (!ret) { - director_connection_deinit(&conn); + director_connection_disconnected(&conn); break; } } + director_sync_thaw(dir); } static void director_connection_send_directors(struct director_connection *conn, @@ -773,6 +870,18 @@ return o_stream_flush(conn->output); } +static void +director_connection_init_timeout(struct director_connection *conn) +{ + if (conn->host != NULL) + conn->host->last_failed = ioloop_time; + if (!conn->connected) + i_error("director(%s): Connect timed out", conn->name); + else + i_error("director(%s): Handshaking timed out", conn->name); + director_connection_disconnected(&conn); +} + static struct director_connection * director_connection_init_common(struct director *dir, int fd) { @@ -785,6 +894,9 @@ conn->output = o_stream_create_fd(conn->fd, MAX_OUTBUF_SIZE, FALSE); o_stream_set_flush_callback(conn->output, director_connection_output, conn); + conn->to_ping = timeout_add(DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS, + director_connection_init_timeout, conn); + DLLIST_PREPEND(&dir->connections, conn); return conn; } @@ -798,14 +910,15 @@ } struct director_connection * -director_connection_init_in(struct director *dir, int fd) +director_connection_init_in(struct director *dir, int fd, + const struct ip_addr *ip) { struct director_connection *conn; conn = director_connection_init_common(dir, fd); conn->in = TRUE; conn->connected = TRUE; - conn->name = "<incoming>"; + conn->name = i_strdup_printf("%s/in", net_ip2addr(ip)); conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn); director_connection_send_handshake(conn); @@ -822,14 +935,26 @@ conn->host->last_failed = ioloop_time; i_error("director(%s): connect() failed: %s", conn->name, strerror(err)); - director_connection_deinit(&conn); - - /* try connecting to next server */ - director_connect(dir); + director_connection_disconnected(&conn); return; } conn->connected = TRUE; + if (dir->right != NULL) { + /* see if we should disconnect or keep the existing + connection. */ + if (director_host_cmp_to_self(conn->host, dir->right->host, + dir->self_host) <= 0) { + /* the old connection is the correct one */ + director_connection_deinit(&conn); + return; + } + director_connection_deinit(&dir->right); + } + dir->right = conn; + i_free(conn->name); + conn->name = i_strdup_printf("%s/right", conn->host->name); + io_remove(&conn->io); director_connection_send_handshake(conn); @@ -847,10 +972,15 @@ { struct director_connection *conn; + /* make sure we don't keep old sequence values across restarts */ + host->last_seq = 0; + conn = director_connection_init_common(dir, fd); - conn->name = host->name; + conn->name = i_strdup_printf("%s/out", host->name); conn->host = host; - conn->io = io_add(conn->fd, IO_WRITE, + /* use IO_READ instead of IO_WRITE, so that we don't assign + dir->right until remote has actually sent some data */ + conn->io = io_add(conn->fd, IO_READ, director_connection_connected, conn); return conn; } @@ -858,19 +988,18 @@ void director_connection_deinit(struct director_connection **_conn) { struct director_connection *conn = *_conn; + struct director *dir = conn->dir; *_conn = NULL; - if (conn->dir->debug && conn->host != NULL) { - i_debug("Director %s:%u disconnected", - net_ip2addr(&conn->host->ip), conn->host->port); - } + DLLIST_REMOVE(&dir->connections, conn); + if (dir->left == conn) + dir->left = NULL; + if (dir->right == conn) + dir->right = NULL; - if (conn->dir->left == conn) - conn->dir->left = NULL; - if (conn->dir->right == conn) - conn->dir->right = NULL; - + if (conn->user_iter != NULL) + user_directory_iter_deinit(&conn->user_iter); if (conn->to != NULL) timeout_remove(&conn->to); if (conn->to_ping != NULL) @@ -884,12 +1013,29 @@ if (conn->in) master_service_client_connection_destroyed(master_service); + i_free(conn->name); i_free(conn); + + if (dir->left == NULL || dir->right == NULL) { + /* we aren't synced until we're again connected to a ring */ + dir->sync_seq++; + dir->ring_synced = FALSE; + } +} + +void director_connection_disconnected(struct director_connection **_conn) +{ + struct director_connection *conn = *_conn; + struct director *dir = conn->dir; + + director_connection_deinit(_conn); + if (dir->right == NULL) + director_connect(dir); } static void director_connection_timeout(struct director_connection *conn) { - director_connection_deinit(&conn); + director_connection_disconnected(&conn); } void director_connection_send(struct director_connection *conn, @@ -925,15 +1071,17 @@ static void director_connection_ping_timeout(struct director_connection *conn) { i_error("director(%s): Ping timed out, disconnecting", conn->name); - director_connection_deinit(&conn); + director_connection_disconnected(&conn); } static void director_connection_ping(struct director_connection *conn) { + conn->sync_ping = FALSE; if (conn->ping_waiting) return; - timeout_remove(&conn->to_ping); + if (conn->to_ping != NULL) + timeout_remove(&conn->to_ping); conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS, director_connection_ping_timeout, conn); director_connection_send(conn, "PING\n"); @@ -944,3 +1092,46 @@ { return conn->name; } + +struct director_host * +director_connection_get_host(struct director_connection *conn) +{ + return conn->host; +} + +struct director_connection * +director_connection_find_outgoing(struct director *dir, + struct director_host *host) +{ + struct director_connection *conn; + + for (conn = dir->connections; conn != NULL; conn = conn->next) { + if (conn->host == host && !conn->in) + return conn; + } + return NULL; +} + +void director_connection_cork(struct director_connection *conn) +{ + o_stream_cork(conn->output); +} + +void director_connection_uncork(struct director_connection *conn) +{ + o_stream_uncork(conn->output); +} + +void director_connection_wait_sync(struct director_connection *conn) +{ + /* switch to faster ping timeout. avoid reseting the timeout if it's + already fast. */ + if (conn->ping_waiting || conn->sync_ping) + return; + + if (conn->to_ping != NULL) + timeout_remove(&conn->to_ping); + conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS, + director_connection_ping, conn); + conn->sync_ping = TRUE; +}
--- a/src/director/director-connection.h Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/director-connection.h Thu Jun 24 20:29:27 2010 +0100 @@ -5,7 +5,8 @@ struct director; struct director_connection * -director_connection_init_in(struct director *dir, int fd); +director_connection_init_in(struct director *dir, int fd, + const struct ip_addr *ip); struct director_connection * director_connection_init_out(struct director *dir, int fd, struct director_host *host); @@ -13,10 +14,19 @@ void director_connection_send(struct director_connection *conn, const char *data); +void director_connection_wait_sync(struct director_connection *conn); void director_connection_send_except(struct director_connection *conn, struct director_host *skip_host, const char *data); const char *director_connection_get_name(struct director_connection *conn); +struct director_host * +director_connection_get_host(struct director_connection *conn); +struct director_connection * +director_connection_find_outgoing(struct director *dir, + struct director_host *host); + +void director_connection_cork(struct director_connection *conn); +void director_connection_uncork(struct director_connection *conn); #endif
--- a/src/director/director-host.h Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/director-host.h Thu Jun 24 20:29:27 2010 +0100 @@ -11,15 +11,14 @@ /* name contains "ip:port" */ char *name; - - /* each command between directors contains an increasing sequence. - if director A gets conflicting information about director B, it can - trust the one that has the highest sequence. */ + /* change commands each have originating host and originating sequence. + we'll keep track of the highest sequence we've seen from the host. + if we find a lower sequence, we've already handled the command and + it can be ignored (or: it must be ignored to avoid potential command + loops) */ unsigned int last_seq; - /* Last time host was detected to be down/broken */ time_t last_failed; - /* we are this director */ unsigned int self:1; };
--- a/src/director/director-request.c Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/director-request.c Thu Jun 24 20:29:27 2010 +0100 @@ -9,6 +9,7 @@ #include "director-request.h" #define DIRECTOR_REQUEST_TIMEOUT_SECS 30 +#define RING_NOCONN_WARNING_DELAY_MSECS (2*1000) struct director_request { struct director *dir; @@ -66,6 +67,28 @@ array_append(&dir->pending_requests, &request, 1); } +static void ring_noconn_warning(struct director *dir) +{ + if (!dir->ring_handshaked) { + i_warning("Delaying all requests " + "until all directors have connected"); + } else { + i_warning("Delaying new user requests until ring is synced"); + } + dir->ring_handshake_warning_sent = TRUE; + timeout_remove(&dir->to_handshake_warning); +} + +static void ring_log_delayed_warning(struct director *dir) +{ + if (dir->ring_handshake_warning_sent || + dir->to_handshake_warning != NULL) + return; + + dir->to_handshake_warning = timeout_add(RING_NOCONN_WARNING_DELAY_MSECS, + ring_noconn_warning, dir); +} + bool director_request_continue(struct director_request *request) { struct director *dir = request->dir; @@ -74,11 +97,7 @@ if (!dir->ring_handshaked) { /* delay requests until ring handshaking is complete */ - if (!dir->ring_handshake_warning_sent) { - i_warning("Delaying requests until all " - "directors have connected"); - dir->ring_handshake_warning_sent = TRUE; - } + ring_log_delayed_warning(dir); return FALSE; } @@ -88,8 +107,7 @@ else { if (!dir->ring_synced) { /* delay adding new users until ring is again synced */ - if (dir->debug) - i_debug("Delaying request until ring is synced"); + ring_log_delayed_warning(dir); return FALSE; } host = mail_host_get_by_hash(dir->mail_hosts,
--- a/src/director/director-test.c Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/director-test.c Thu Jun 24 20:29:27 2010 +0100 @@ -36,6 +36,7 @@ #define USER_TIMEOUT_MSECS (1000*60) #define ADMIN_RANDOM_TIMEOUT_MSECS 500 #define DIRECTOR_CONN_MAX_DELAY_MSECS 100 +#define DIRECTOR_DISCONNECT_TIMEOUT_SECS 10 struct host { int refcount; @@ -83,6 +84,7 @@ struct io *io; struct istream *input; struct timeout *to_random; + bool pending_command; }; static struct imap_client *imap_clients; @@ -91,6 +93,7 @@ static struct hash_table *hosts; static ARRAY_DEFINE(hosts_array, struct host *); static struct admin_connection *admin; +static struct timeout *to_disconnect; static void imap_client_destroy(struct imap_client **client); static void director_connection_destroy(struct director_connection **conn); @@ -330,6 +333,13 @@ director_connection_create(int in_fd, const struct ip_addr *local_ip) { struct director_connection *conn; + int out_fd; + + out_fd = net_connect_ip(local_ip, DIRECTOR_OUT_PORT, NULL); + if (out_fd == -1) { + (void)close(in_fd); + return; + } conn = i_new(struct director_connection, 1); conn->in_fd = in_fd; @@ -338,7 +348,7 @@ conn->in_io = io_add(conn->in_fd, IO_READ, director_connection_in_input, conn); - conn->out_fd = net_connect_ip(local_ip, DIRECTOR_OUT_PORT, NULL); + conn->out_fd = out_fd; conn->out_input = i_stream_create_fd(conn->out_fd, (size_t)-1, FALSE); conn->out_output = o_stream_create_fd(conn->out_fd, (size_t)-1, FALSE); conn->out_io = io_add(conn->out_fd, IO_READ, @@ -404,6 +414,7 @@ while ((line = i_stream_read_next_line(conn->input)) != NULL) { if (strcmp(line, "OK") != 0) i_error("director-doveadm: Unexpected input: %s", line); + conn->pending_command = FALSE; } if (conn->input->stream_errno != 0 || conn->input->eof) i_fatal("director-doveadm: Connection lost"); @@ -414,6 +425,9 @@ struct host *const *hosts; unsigned int i, count; + if (conn->pending_command) + return; + hosts = array_get(&hosts_array, &count); i = rand() % count; @@ -421,6 +435,7 @@ admin_send(conn, t_strdup_printf("HOST-SET\t%s\t%u\n", net_ip2addr(&hosts[i]->ip), hosts[i]->vhost_count)); + conn->pending_command = TRUE; } static struct admin_connection *admin_connect(const char *path) @@ -458,7 +473,8 @@ struct admin_connection *conn = *_conn; *_conn = NULL; - //timeout_remove(&conn->to_random); + if (conn->to_random != NULL) + timeout_remove(&conn->to_random); i_stream_destroy(&conn->input); io_remove(&conn->io); net_disconnect(conn->fd); @@ -492,6 +508,23 @@ net_set_nonblock(admin->fd, TRUE); } +static void +director_connection_disconnect_timeout(void *context ATTR_UNUSED) +{ + struct director_connection *conn; + unsigned int i, count = 0; + + for (conn = director_connections; conn != NULL; conn = conn->next) + count++; + + if (count != 0) { + i = 0; count = rand() % count; + for (conn = director_connections; i < count; conn = conn->next) + i++; + director_connection_destroy(&conn); + } +} + static void main_init(const char *admin_path) { users = hash_table_create(default_pool, default_pool, 0, @@ -504,6 +537,10 @@ admin = admin_connect(admin_path); admin_send(admin, "HOST-LIST\n"); admin_read_hosts(admin); + + to_disconnect = + timeout_add(1000*(1 + rand()%DIRECTOR_DISCONNECT_TIMEOUT_SECS), + director_connection_disconnect_timeout, NULL); } static void main_deinit(void) @@ -516,6 +553,7 @@ imap_client_destroy(&client); } + timeout_remove(&to_disconnect); while (director_connections != NULL) { struct director_connection *conn = director_connections; director_connection_destroy(&conn);
--- a/src/director/director-test.sh Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/director-test.sh Thu Jun 24 20:29:27 2010 +0100 @@ -15,11 +15,11 @@ listen = 127.0.1.$i base_dir = /var/run/dovecot$i -!include dovecot-director-common.conf +!include dovecot-director-common.conf.inc EOF done -cat > dovecot-director-common.conf <<EOF +cat > dovecot-director-common.conf.inc <<EOF log_path = /var/log/dovecot.log info_log_path = /var/log/dovecot-access.log director_servers =$dirs @@ -83,12 +83,7 @@ echo echo "Start up dovecot instances:" echo -echo "dovecot -c dovecot-test.conf" -i=0 -while [ $i != $director_count ]; do - i=`expr $i + 1` - echo "dovecot -c dovecot-director$i.conf" -done +echo 'for conf in dovecot*.conf; do dovecot -c $conf; done' echo echo "Start testing:" echo
--- a/src/director/director.c Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/director.c Thu Jun 24 20:29:27 2010 +0100 @@ -11,6 +11,7 @@ #include "director.h" #define DIRECTOR_RECONNECT_RETRY_SECS 60 +#define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000) static bool director_is_self_ip_set(struct director *dir) { @@ -79,7 +80,8 @@ unsigned int port; int fd; - i_assert(dir->right == NULL); + if (director_connection_find_outgoing(dir, host) != NULL) + return 0; if (dir->debug) { i_debug("Connecting to %s:%u", @@ -93,10 +95,24 @@ return -1; } - dir->right = director_connection_init_out(dir, fd, host); + director_connection_init_out(dir, fd, host); return 0; } +static struct director_host * +director_get_preferred_right_host(struct director *dir) +{ + struct director_host *const *hosts; + unsigned int count, self_idx; + + hosts = array_get(&dir->dir_hosts, &count); + if (count == 1) + return NULL; + + self_idx = director_find_self_idx(dir); + return hosts[(self_idx + 1) % count]; +} + void director_connect(struct director *dir) { struct director_host *const *hosts; @@ -122,28 +138,98 @@ } if (i == count) { /* we're the only one */ - director_set_ring_handshaked(dir); + if (dir->left != NULL) { + /* since we couldn't connect to it, + it must have failed recently */ + director_connection_deinit(&dir->left); + } + if (!dir->ring_handshaked) + director_set_ring_handshaked(dir); + else + director_set_ring_synced(dir); } } void director_set_ring_handshaked(struct director *dir) { + i_assert(!dir->ring_handshaked); + + if (dir->to_handshake_warning != NULL) + timeout_remove(&dir->to_handshake_warning); if (dir->ring_handshake_warning_sent) { i_warning("Directors have been connected, " - "continuing delayed connections"); + "continuing delayed requests"); dir->ring_handshake_warning_sent = FALSE; } if (dir->debug) i_debug("Director ring handshaked"); dir->ring_handshaked = TRUE; + director_set_ring_synced(dir); +} + +static void director_reconnect_timeout(struct director *dir) +{ + struct director_host *cur_host, *preferred_host = + director_get_preferred_right_host(dir); + + cur_host = dir->right == NULL ? NULL : + director_connection_get_host(dir->right); + + if (cur_host != preferred_host) + (void)director_connect_host(dir, preferred_host); + else { + /* the connection hasn't finished sync yet. + keep this timeout for now. */ + } +} + +void director_set_ring_synced(struct director *dir) +{ + struct director_host *host; + + i_assert(!dir->ring_synced); + i_assert((dir->left != NULL && dir->right != NULL) || + (dir->left == NULL && dir->right == NULL)); + + if (dir->to_handshake_warning != NULL) + timeout_remove(&dir->to_handshake_warning); + if (dir->ring_handshake_warning_sent) { + i_warning("Ring is synced, continuing delayed requests"); + dir->ring_handshake_warning_sent = FALSE; + } + + host = dir->right == NULL ? NULL : + director_connection_get_host(dir->right); + if (host != director_get_preferred_right_host(dir)) { + /* try to reconnect to preferred host later */ + if (dir->to_reconnect == NULL) { + dir->to_reconnect = + timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS, + director_reconnect_timeout, dir); + } + } else { + if (dir->to_reconnect != NULL) + timeout_remove(&dir->to_reconnect); + } + dir->ring_synced = TRUE; director_set_state_changed(dir); } static void director_sync(struct director *dir) { - /* we're synced again, once we receive this SYNC back */ + if (dir->sync_frozen) { + dir->sync_pending = TRUE; + return; + } + if (dir->right == NULL) { + i_assert(!dir->ring_synced || + (dir->left == NULL && dir->right == NULL)); + return; + } + + /* we're synced again when we receive this SYNC back */ dir->sync_seq++; dir->ring_synced = FALSE; @@ -153,36 +239,94 @@ director_connection_get_name(dir->right)); } + if (dir->left != NULL) + director_connection_wait_sync(dir->left); + director_connection_wait_sync(dir->right); director_connection_send(dir->right, t_strdup_printf( "SYNC\t%s\t%u\t%u\n", net_ip2addr(&dir->self_ip), dir->self_port, dir->sync_seq)); } +void director_sync_freeze(struct director *dir) +{ + i_assert(!dir->sync_frozen); + i_assert(!dir->sync_pending); + + if (dir->left != NULL) + director_connection_cork(dir->left); + if (dir->right != NULL) + director_connection_cork(dir->right); + dir->sync_frozen = TRUE; +} + +void director_sync_thaw(struct director *dir) +{ + i_assert(dir->sync_frozen); + + dir->sync_frozen = FALSE; + if (dir->sync_pending) { + dir->sync_pending = FALSE; + director_sync(dir); + } + if (dir->left != NULL) + director_connection_uncork(dir->left); + if (dir->right != NULL) + director_connection_uncork(dir->right); +} + void director_update_host(struct director *dir, struct director_host *src, + struct director_host *orig_src, struct mail_host *host) { + /* update state in case this is the first mail host being added */ director_set_state_changed(dir); + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + director_update_send(dir, src, t_strdup_printf( - "HOST\t%s\t%u\n", net_ip2addr(&host->ip), host->vhost_count)); + "HOST\t%s\t%u\t%u\t%s\t%u\n", + net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq, + net_ip2addr(&host->ip), host->vhost_count)); director_sync(dir); } void director_remove_host(struct director *dir, struct director_host *src, + struct director_host *orig_src, struct mail_host *host) { - director_update_send(dir, src, t_strdup_printf( - "HOST-REMOVE\t%s\n", net_ip2addr(&host->ip))); + if (src != NULL) { + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + + director_update_send(dir, src, t_strdup_printf( + "HOST-REMOVE\t%s\t%u\t%u\t%s\n", + net_ip2addr(&orig_src->ip), orig_src->port, + orig_src->last_seq, net_ip2addr(&host->ip))); + } + user_directory_remove_host(dir->users, host); mail_host_remove(dir->mail_hosts, host); director_sync(dir); } void director_flush_host(struct director *dir, struct director_host *src, + struct director_host *orig_src, struct mail_host *host) { + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + director_update_send(dir, src, t_strdup_printf( - "HOST-FLUSH\t%s\n", net_ip2addr(&host->ip))); + "HOST-FLUSH\t%s\t%u\t%u\t%s\n", + net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq, + net_ip2addr(&host->ip))); user_directory_remove_host(dir->users, host); director_sync(dir); } @@ -245,6 +389,10 @@ user_directory_deinit(&dir->users); mail_hosts_deinit(&dir->mail_hosts); mail_hosts_deinit(&dir->orig_config_hosts); + if (dir->to_reconnect != NULL) + timeout_remove(&dir->to_reconnect); + if (dir->to_handshake_warning != NULL) + timeout_remove(&dir->to_handshake_warning); if (dir->to_request != NULL) timeout_remove(&dir->to_request); array_foreach(&dir->dir_hosts, hostp)
--- a/src/director/director.h Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/director.h Thu Jun 24 20:29:27 2010 +0100 @@ -10,16 +10,6 @@ typedef void director_state_change_callback_t(struct director *dir); -struct director_host_change { - /* originating director for this change. keep ip/port here separately, - because by the time its sync comes, the director itself may have - already been removed. */ - struct ip_addr ip; - unsigned int port; - /* highest change sequence from this director */ - unsigned int seq; -}; - struct director { const struct director_settings *set; @@ -31,6 +21,9 @@ struct director_host *self_host; struct director_connection *left, *right; + /* all director connections */ + struct director_connection *connections; + struct timeout *to_reconnect; /* current mail hosts */ struct mail_host_list *mail_hosts; @@ -43,6 +36,7 @@ /* these requests are waiting for directors to be in synced */ ARRAY_DEFINE(pending_requests, struct director_request *); struct timeout *to_request; + struct timeout *to_handshake_warning; director_state_change_callback_t *state_change_callback; @@ -56,6 +50,8 @@ unsigned int ring_handshaked:1; unsigned int ring_handshake_warning_sent:1; unsigned int ring_synced:1; + unsigned int sync_frozen:1; + unsigned int sync_pending:1; unsigned int debug:1; }; @@ -73,17 +69,24 @@ void director_connect(struct director *dir); void director_set_ring_handshaked(struct director *dir); +void director_set_ring_synced(struct director *dir); void director_set_state_changed(struct director *dir); void director_update_host(struct director *dir, struct director_host *src, + struct director_host *orig_src, struct mail_host *host); void director_remove_host(struct director *dir, struct director_host *src, + struct director_host *orig_src, struct mail_host *host); void director_flush_host(struct director *dir, struct director_host *src, + struct director_host *orig_src, struct mail_host *host); void director_update_user(struct director *dir, struct director_host *src, struct user *user); +void director_sync_freeze(struct director *dir); +void director_sync_thaw(struct director *dir); + /* Send data to all directors using both left and right connections (unless they're the same). */ void director_update_send(struct director *dir, struct director_host *src,
--- a/src/director/doveadm-connection.c Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/doveadm-connection.c Thu Jun 24 20:29:27 2010 +0100 @@ -91,7 +91,7 @@ host = mail_host_add_ip(dir->mail_hosts, &ip); if (vhost_count != -1U) mail_host_set_vhost_count(dir->mail_hosts, host, vhost_count); - director_update_host(dir, dir->self_host, host); + director_update_host(dir, dir->self_host, NULL, host); o_stream_send(conn->output, "OK\n", 3); return TRUE; @@ -111,7 +111,8 @@ if (host == NULL) o_stream_send_str(conn->output, "NOTFOUND\n"); else { - director_remove_host(conn->dir, conn->dir->self_host, host); + director_remove_host(conn->dir, conn->dir->self_host, + NULL, host); o_stream_send(conn->output, "OK\n", 3); } return TRUE; @@ -122,8 +123,10 @@ { struct mail_host *const *hostp; - array_foreach(mail_hosts_get(conn->dir->mail_hosts), hostp) - director_flush_host(conn->dir, conn->dir->self_host, *hostp); + array_foreach(mail_hosts_get(conn->dir->mail_hosts), hostp) { + director_flush_host(conn->dir, conn->dir->self_host, + NULL, *hostp); + } o_stream_send(conn->output, "OK\n", 3); } @@ -146,7 +149,8 @@ if (host == NULL) o_stream_send_str(conn->output, "NOTFOUND\n"); else { - director_flush_host(conn->dir, conn->dir->self_host, host); + director_flush_host(conn->dir, conn->dir->self_host, + NULL, host); o_stream_send(conn->output, "OK\n", 3); } return TRUE;
--- a/src/director/main.c Thu Jun 24 16:27:20 2010 +0100 +++ b/src/director/main.c Thu Jun 24 20:29:27 2010 +0100 @@ -34,7 +34,7 @@ return -1; } - director_connection_init_in(director, fd); + director_connection_init_in(director, fd, ip); return 0; } @@ -166,7 +166,7 @@ &director_setting_parser_info, NULL }; - unsigned int test_port; + unsigned int test_port = 0; const char *error; bool debug = FALSE; int c;