changeset 14310:7a26c427fc78

director: Avoid user getting redirected to different servers near its expiration. Fixes a problem when user is logging in at the same time on director1 which thinks the user is expired, and on director2 which thinks the user expires only in 1 second.
author Timo Sirainen <tss@iki.fi>
date Thu, 08 Mar 2012 16:03:45 +0200
parents d6fda337af15
children 71b64b7b2e63
files src/director/director-connection.c src/director/director-request.c src/director/director.c src/director/director.h src/director/main.c src/director/notify-connection.c src/director/user-directory.c src/director/user-directory.h
diffstat 8 files changed, 264 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/src/director/director-connection.c	Thu Mar 08 10:48:08 2012 +0200
+++ b/src/director/director-connection.c	Thu Mar 08 16:03:45 2012 +0200
@@ -181,20 +181,37 @@
 static bool
 director_user_refresh(struct director_connection *conn,
 		      unsigned int username_hash, struct mail_host *host,
-		      time_t timestamp, struct user **user_r)
+		      time_t timestamp, bool weak, struct user **user_r)
 {
 	struct director *dir = conn->dir;
 	struct user *user;
-	bool ret = FALSE;
+	bool ret = FALSE, unset_weak_user = FALSE;
 
 	user = user_directory_lookup(dir->users, username_hash);
 	if (user == NULL) {
 		*user_r = user_directory_add(dir->users, username_hash,
 					     host, timestamp);
+		(*user_r)->weak = weak;
 		return TRUE;
 	}
 
-	if (user->host != host) {
+	if (user->weak) {
+		if (!weak) {
+			/* removing user's weakness */
+			unset_weak_user = TRUE;
+			user->weak = FALSE;
+			ret = TRUE;
+		} else {
+			/* weak user marked again as weak */
+		}
+	} else if (weak &&
+		   !user_directory_user_is_recently_updated(dir->users, user)) {
+		/* mark the user as weak */
+		user->weak = TRUE;
+		ret = TRUE;
+	} else if (user->host != host) {
+		/* non-weak user received a non-weak update with
+		   conflicting host. this shouldn't happen. */
 		string_t *str = t_str_new(128);
 
 		str_printfa(str, "User hash %u "
@@ -221,16 +238,29 @@
 			/* change the host. we'll also need to remove the user
 			   from the old host's user_count, because we can't
 			   keep track of the user for more than one host */
-			user->host->user_count--;
-			user->host = host;
-			user->host->user_count++;
+		} else {
+			/* keep the host */
+			host = user->host;
 		}
 		ret = TRUE;
 	}
+	if (user->host != host) {
+		user->host->user_count--;
+		user->host = host;
+		user->host->user_count++;
+		ret = TRUE;
+	}
 	if (timestamp == ioloop_time && (time_t)user->timestamp != timestamp) {
 		user_directory_refresh(dir->users, user);
 		ret = TRUE;
 	}
+
+	if (unset_weak_user) {
+		/* user is no longer weak. handle pending requests for
+		   this user if there are any */
+		director_set_state_changed(conn->dir);
+	}
+
 	*user_r = user;
 	return ret;
 }
@@ -243,8 +273,9 @@
 	struct ip_addr ip;
 	struct mail_host *host;
 	struct user *user;
+	bool weak;
 
-	if (str_array_length(args) != 3 ||
+	if (str_array_length(args) < 3 ||
 	    str_to_uint(args[0], &username_hash) < 0 ||
 	    net_addr2ip(args[1], &ip) < 0 ||
 	    str_to_uint(args[2], &timestamp) < 0) {
@@ -252,6 +283,7 @@
 			conn->name);
 		return FALSE;
 	}
+	weak = args[3] != NULL && args[3][0] == 'w';
 
 	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
 	if (host == NULL) {
@@ -260,12 +292,13 @@
 		return FALSE;
 	}
 
-	director_user_refresh(conn, username_hash, host, timestamp, &user);
+	director_user_refresh(conn, username_hash, host, timestamp, weak, &user);
 	return TRUE;
 }
 
 static bool
-director_cmd_user(struct director_connection *conn, const char *const *args)
+director_cmd_user(struct director_connection *conn,
+		  const char *const *args)
 {
 	unsigned int username_hash;
 	struct ip_addr ip;
@@ -286,8 +319,10 @@
 	}
 
 	if (director_user_refresh(conn, username_hash,
-				  host, ioloop_time, &user))
+				  host, ioloop_time, FALSE, &user)) {
+		i_assert(!user->weak);
 		director_update_user(conn->dir, conn->host, user);
+	}
 	return TRUE;
 }
 
@@ -383,6 +418,54 @@
 }
 
 static bool
+director_cmd_user_weak(struct director_connection *conn,
+		       const char *const *args)
+{
+	struct director_host *dir_host;
+	struct ip_addr ip;
+	unsigned int username_hash;
+	struct mail_host *host;
+	struct user *user;
+	struct director_host *src_host = conn->host;
+	bool weak = TRUE;
+	int ret;
+
+	if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) < 0)
+		return FALSE;
+
+	if (str_array_length(args) != 2 ||
+	    str_to_uint(args[0], &username_hash) < 0 ||
+	    net_addr2ip(args[1], &ip) < 0) {
+		i_error("director(%s): Invalid USER-WEAK args", conn->name);
+		return FALSE;
+	}
+
+	host = mail_host_lookup(conn->dir->mail_hosts, &ip);
+	if (host == NULL) {
+		/* we probably just removed this host. */
+		return TRUE;
+	}
+
+	if (ret > 0) {
+		/* The entire ring has seen this USER-WEAK.
+		   make it non-weak now. */
+		weak = FALSE;
+		src_host = conn->dir->self_host;
+	}
+
+	if (director_user_refresh(conn, username_hash,
+				  host, ioloop_time, weak, &user)) {
+		if (!user->weak)
+			director_update_user(conn->dir, src_host, user);
+		else {
+			director_update_user_weak(conn->dir, src_host,
+						  dir_host, user);
+		}
+	}
+	return TRUE;
+}
+
+static bool
 director_cmd_host_int(struct director_connection *conn, const char *const *args,
 		      struct director_host *dir_host)
 {
@@ -666,11 +749,15 @@
 	/* only incoming connections get a full USER list, but outgoing
 	   connections can also receive USER updates during handshake and
 	   it wouldn't be safe to ignore them. */
-	if (strcmp(cmd, "USER") == 0 && conn->me_received) {
-		if (conn->in)
-			return director_handshake_cmd_user(conn, args);
-		else
+	if (!conn->me_received) {
+		/* no USER updates until ME */
+	} else if (conn->in && strcmp(cmd, "USER") == 0) {
+		return director_handshake_cmd_user(conn, args);
+	} else if (!conn->in) {
+		if (strcmp(cmd, "USER") == 0)
 			return director_cmd_user(conn, args);
+		if (strcmp(cmd, "USER-WEAK") == 0)
+			return director_cmd_user_weak(conn, args);
 	}
 	/* both get DONE */
 	if (strcmp(cmd, "DONE") == 0 && !conn->handshake_received &&
@@ -702,7 +789,7 @@
 			return;
 		}
 
-		dir->synced_minor_version = minor_version;
+		dir->ring_min_version = minor_version;
 		if (!dir->ring_handshaked) {
 			/* the ring is handshaked */
 			director_set_ring_handshaked(dir);
@@ -852,6 +939,8 @@
 
 	if (strcmp(cmd, "USER") == 0)
 		return director_cmd_user(conn, args);
+	if (strcmp(cmd, "USER-WEAK") == 0)
+		return director_cmd_user_weak(conn, args);
 	if (strcmp(cmd, "HOST") == 0)
 		return director_cmd_host(conn, args);
 	if (strcmp(cmd, "HOST-REMOVE") == 0)
@@ -951,20 +1040,17 @@
 
 	o_stream_cork(conn->output);
 	while ((user = user_directory_iter_next(conn->user_iter)) != NULL) {
-		if (!user_directory_user_has_connections(conn->dir->users,
-							 user)) {
-			/* user is already expired */
-			continue;
-		}
+		T_BEGIN {
+			string_t *str = t_str_new(128);
 
-		T_BEGIN {
-			const char *line;
-
-			line = t_strdup_printf("USER\t%u\t%s\t%u\n",
-					       user->username_hash,
-					       net_ip2addr(&user->host->ip),
-					       user->timestamp);
-			director_connection_send(conn, line);
+			str_printfa(str, "USER\t%u\t%s\t%u",
+				    user->username_hash,
+				    net_ip2addr(&user->host->ip),
+				    user->timestamp);
+			if (user->weak)
+				str_append(str, "\tw");
+			str_append_c(str, '\n');
+			director_connection_send(conn, str_c(str));
 		} T_END;
 
 		if (o_stream_get_buffer_used_size(conn->output) >= OUTBUF_FLUSH_THRESHOLD) {
--- a/src/director/director-request.c	Thu Mar 08 10:48:08 2012 +0200
+++ b/src/director/director-request.c	Thu Mar 08 16:03:45 2012 +0200
@@ -117,6 +117,74 @@
 						ring_noconn_warning, dir);
 }
 
+static bool director_request_existing(struct director *dir, struct user *user)
+{
+	struct mail_host *host;
+
+	if (user->kill_state != USER_KILL_STATE_NONE) {
+		/* delay processing this user's connections until
+		   its existing connections have been killed */
+		return FALSE;
+	}
+	if (user->weak) {
+		/* wait for user to become non-weak */
+		return FALSE;
+	}
+	if (!user_directory_user_is_near_expiring(dir->users, user))
+		return TRUE;
+
+	/* user is close to being expired. another director may have
+	   already expired it. */
+	host = mail_host_get_by_hash(dir->mail_hosts, user->username_hash);
+	if (!dir->ring_synced) {
+		/* try again later once ring is synced */
+		return FALSE;
+	}
+	if (user->host == host) {
+		/* doesn't matter, other directors would
+		   assign the user the same way regardless */
+		return TRUE;
+	}
+
+	/* We have to worry about two separate timepoints in here:
+
+	   a) some directors think the user isn't expiring, and
+	   others think the user is near expiring
+
+	   b) some directors think the user is near expiring, and
+	   others think the user has already expired
+
+	   What we don't have to worry about is:
+
+	   !c) some directors think the user isn't expiring, and
+	   others think the user has already expired
+
+	   If !c) happens, the user might get redirected to different backends.
+	   We'll use a large enough timeout between a) and b) states, so that
+	   !c) should never happen.
+
+	   So what we'll do here is:
+
+	   1. Send a USER-WEAK notification to all directors with the new host.
+	   2. Each director receiving USER-WEAK refreshes the user's timestamp
+	   and host, but marks the user as being weak.
+	   3. Once USER-WEAK has reached all directors, a real USER update is
+	   sent, which removes the weak-flag.
+	   4. If a director ever receives a USER update for a weak user, the
+	   USER update overrides the host and removes the weak-flag.
+	   5. Director doesn't let any weak user log in, until the weak-flag
+	   gets removed.
+	*/
+	if (dir->ring_min_version < DIRECTOR_VERSION_WEAK_USERS) {
+		/* weak users not supported by ring currently */
+		return TRUE;
+	} else {
+		user->weak = TRUE;
+		director_update_user_weak(dir, dir->self_host, NULL, user);
+		return FALSE;
+	}
+}
+
 bool director_request_continue(struct director_request *request)
 {
 	struct director *dir = request->dir;
@@ -131,11 +199,8 @@
 
 	user = user_directory_lookup(dir->users, request->username_hash);
 	if (user != NULL) {
-		if (user->kill_state != USER_KILL_STATE_NONE) {
-			/* delay processing this user's connections until
-			   its existing connections have been killed */
+		if (!director_request_existing(dir, user))
 			return FALSE;
-		}
 		user_directory_refresh(dir->users, user);
 	} else {
 		if (!dir->ring_synced) {
@@ -153,6 +218,7 @@
 					  host, ioloop_time);
 	}
 
+	i_assert(!user->weak);
 	director_update_user(dir, dir->self_host, user);
 	T_BEGIN {
 		request->callback(&user->host->ip, NULL, request->context);
--- a/src/director/director.c	Thu Mar 08 10:48:08 2012 +0200
+++ b/src/director/director.c	Thu Mar 08 16:03:45 2012 +0200
@@ -155,7 +155,7 @@
 			   it must have failed recently */
 			director_connection_deinit(&dir->left);
 		}
-		dir->synced_minor_version = DIRECTOR_VERSION_MINOR;
+		dir->ring_min_version = DIRECTOR_VERSION_MINOR;
 		if (!dir->ring_handshaked)
 			director_set_ring_handshaked(dir);
 		else
@@ -400,9 +400,29 @@
 void director_update_user(struct director *dir, struct director_host *src,
 			  struct user *user)
 {
+	i_assert(src != NULL);
+
+	i_assert(!user->weak);
+	director_update_send(dir, src, t_strdup_printf("USER\t%u\t%s\n",
+		user->username_hash, net_ip2addr(&user->host->ip)));
+}
+
+void director_update_user_weak(struct director *dir, struct director_host *src,
+			       struct director_host *orig_src,
+			       struct user *user)
+{
+	i_assert(src != NULL);
+	i_assert(user->weak);
+
+	if (orig_src == NULL) {
+		orig_src = dir->self_host;
+		orig_src->last_seq++;
+	}
+
 	director_update_send(dir, src, t_strdup_printf(
-		"USER\t%u\t%s\n", user->username_hash,
-		net_ip2addr(&user->host->ip)));
+		"USER-WEAK\t%s\t%u\t%u\t%u\t%s\n",
+		net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
+		user->username_hash, net_ip2addr(&user->host->ip)));
 }
 
 struct director_user_kill_finish_ctx {
@@ -643,6 +663,7 @@
 	dir->mail_hosts = mail_hosts_init();
 
 	dir->ipc_proxy = ipc_client_init(DIRECTOR_IPC_PROXY_PATH);
+	dir->ring_min_version = DIRECTOR_VERSION_MINOR;
 	return dir;
 }
 
--- a/src/director/director.h	Thu Mar 08 10:48:08 2012 +0200
+++ b/src/director/director.h	Thu Mar 08 16:03:45 2012 +0200
@@ -6,7 +6,10 @@
 
 #define DIRECTOR_VERSION_NAME "director"
 #define DIRECTOR_VERSION_MAJOR 1
-#define DIRECTOR_VERSION_MINOR 0
+#define DIRECTOR_VERSION_MINOR 1
+
+/* weak users supported in protocol v1.1+ */
+#define DIRECTOR_VERSION_WEAK_USERS 1
 
 struct director;
 struct mail_host;
@@ -51,7 +54,7 @@
 	struct ipc_client *ipc_proxy;
 	unsigned int sync_seq;
 	/* the lowest minor version supported by the ring */
-	unsigned int synced_minor_version;
+	unsigned int ring_min_version;
 	time_t ring_last_sync_time;
 
 	/* director ring handshaking is complete.
@@ -97,6 +100,9 @@
 			 struct mail_host *host);
 void director_update_user(struct director *dir, struct director_host *src,
 			  struct user *user);
+void director_update_user_weak(struct director *dir, struct director_host *src,
+			       struct director_host *orig_src,
+			       struct user *user);
 void director_move_user(struct director *dir, struct director_host *src,
 			struct director_host *orig_src,
 			unsigned int username_hash, struct mail_host *host);
@@ -105,6 +111,7 @@
 				     struct director_host *src,
 				     struct director_host *orig_src,
 				     unsigned int username_hash);
+void director_user_weak(struct director *dir, struct user *user);
 
 void director_sync_freeze(struct director *dir);
 void director_sync_thaw(struct director *dir);
--- a/src/director/main.c	Thu Mar 08 10:48:08 2012 +0200
+++ b/src/director/main.c	Thu Mar 08 16:03:45 2012 +0200
@@ -125,7 +125,8 @@
 	array_foreach(&dir->pending_requests, requestp) {
 		ret = director_request_continue(*requestp);
 		if (!ret) {
-			/* request for a user being killed */
+			/* a) request for a user being killed
+			   b) user is weak */
 			array_append(&new_requests, requestp, 1);
 		}
 	}
--- a/src/director/notify-connection.c	Thu Mar 08 10:48:08 2012 +0200
+++ b/src/director/notify-connection.c	Thu Mar 08 16:03:45 2012 +0200
@@ -33,6 +33,7 @@
 				i_warning("notify: User %s refreshed too late "
 					  "(%d secs)", line, diff);
 			}
+			user->weak = FALSE;
 			user_directory_refresh(conn->dir->users, user);
 			director_update_user(conn->dir, conn->dir->self_host,
 					     user);
--- a/src/director/user-directory.c	Thu Mar 08 10:48:08 2012 +0200
+++ b/src/director/user-directory.c	Thu Mar 08 16:03:45 2012 +0200
@@ -9,7 +9,10 @@
 #include "mail-host.h"
 #include "user-directory.h"
 
-#define MAX_CLOCK_DRIFT_SECS 2
+/* n% of timeout_secs */
+#define USER_NEAR_EXPIRING_PERCENTAGE 10
+/* but max. of this many secs */
+#define USER_NEAR_EXPIRING_MAX 30
 
 struct user_directory_iter {
 	struct user_directory *dir;
@@ -26,6 +29,9 @@
 
 	char *username_hash_fmt;
 	unsigned int timeout_secs;
+	/* If user's expire time is less than this many seconds away,
+	   don't assume that other directors haven't yet expired it */
+	unsigned int user_near_expiring_secs;
 };
 
 static void user_move_iters(struct user_directory *dir, struct user *user)
@@ -50,6 +56,14 @@
 	i_free(user);
 }
 
+static bool user_directory_user_has_connections(struct user_directory *dir,
+						struct user *user)
+{
+	time_t expire_timestamp = user->timestamp + dir->timeout_secs;
+
+	return expire_timestamp >= ioloop_time;
+}
+
 static void user_directory_drop_expired(struct user_directory *dir)
 {
 	while (dir->head != NULL &&
@@ -127,12 +141,20 @@
 	return mail_user_hash(username, dir->username_hash_fmt);
 }
 
-bool user_directory_user_has_connections(struct user_directory *dir,
-					 struct user *user)
+bool user_directory_user_is_recently_updated(struct user_directory *dir,
+					     struct user *user)
 {
-	time_t expire_timestamp = user->timestamp + dir->timeout_secs;
+	return user->timestamp + dir->timeout_secs/2 >= ioloop_time;
+}
 
-	return expire_timestamp - MAX_CLOCK_DRIFT_SECS >= ioloop_time;
+bool user_directory_user_is_near_expiring(struct user_directory *dir,
+					  struct user *user)
+{
+	time_t expire_timestamp;
+
+	expire_timestamp = user->timestamp +
+		(dir->timeout_secs - dir->user_near_expiring_secs);
+	return expire_timestamp < ioloop_time;
 }
 
 struct user_directory *
@@ -142,6 +164,13 @@
 
 	dir = i_new(struct user_directory, 1);
 	dir->timeout_secs = timeout_secs;
+	dir->user_near_expiring_secs =
+		timeout_secs * USER_NEAR_EXPIRING_PERCENTAGE / 100;
+	dir->user_near_expiring_secs =
+		I_MIN(dir->user_near_expiring_secs, USER_NEAR_EXPIRING_MAX);
+	dir->user_near_expiring_secs =
+		I_MAX(dir->user_near_expiring_secs, 1);
+
 	dir->username_hash_fmt = i_strdup(username_hash_fmt);
 	dir->hash = hash_table_create(default_pool, default_pool,
 				      0, NULL, NULL);
@@ -174,6 +203,7 @@
 	iter->dir = dir;
 	iter->pos = dir->head;
 	array_append(&dir->iters, &iter, 1);
+	user_directory_drop_expired(dir);
 	return iter;
 }
 
--- a/src/director/user-directory.h	Thu Mar 08 10:48:08 2012 +0200
+++ b/src/director/user-directory.h	Thu Mar 08 16:03:45 2012 +0200
@@ -38,6 +38,11 @@
 	/* If not USER_KILL_STATE_NONE, don't allow new connections until all
 	   directors have killed the user's connections. */
 	enum user_kill_state kill_state;
+
+	/* TRUE, if the user's timestamp was close to being expired and we're
+	   now doing a ring-wide sync for this user to make sure we don't
+	   assign conflicting hosts to it */
+	unsigned int weak:1;
 };
 
 /* Create a new directory. Users are dropped if their time gets older
@@ -63,9 +68,10 @@
 unsigned int user_directory_get_username_hash(struct user_directory *dir,
 					      const char *username);
 
-/* Returns TRUE if user still potentially has connections. */
-bool user_directory_user_has_connections(struct user_directory *dir,
-					 struct user *user);
+bool user_directory_user_is_recently_updated(struct user_directory *dir,
+					     struct user *user);
+bool user_directory_user_is_near_expiring(struct user_directory *dir,
+					  struct user *user);
 
 struct user_directory_iter *
 user_directory_iter_init(struct user_directory *dir);