changeset 8559:b0bc4519332f HEAD

Redesigned auth request queuing to auth worker processes. Only a single request is now pending for a worker at a time. If a request was queued for more than 3 seconds, log a warning (but no often than once in 5 minutes).
author Timo Sirainen <tss@iki.fi>
date Tue, 16 Dec 2008 07:50:44 +0200
parents 94e0fda6802e
children b6a7bc10c19a
files src/auth/auth-worker-server.c
diffstat 1 files changed, 166 insertions(+), 176 deletions(-) [+]
line wrap: on
line diff
--- a/src/auth/auth-worker-server.c	Tue Dec 16 06:08:26 2008 +0200
+++ b/src/auth/auth-worker-server.c	Tue Dec 16 07:50:44 2008 +0200
@@ -1,8 +1,9 @@
 /* Copyright (c) 2005-2008 Dovecot authors, see the included COPYING file */
 
 #include "common.h"
+#include "ioloop.h"
 #include "array.h"
-#include "ioloop.h"
+#include "aqueue.h"
 #include "network.h"
 #include "istream.h"
 #include "ostream.h"
@@ -13,12 +14,15 @@
 #include <stdlib.h>
 #include <unistd.h>
 
-#define AUTH_WORKER_MAX_OUTBUF_SIZE 10240
 #define AUTH_WORKER_LOOKUP_TIMEOUT_SECS 60
 #define AUTH_WORKER_MAX_IDLE_SECS (60*30)
+#define AUTH_WORKER_DELAY_WARN_SECS 3
+#define AUTH_WORKER_DELAY_WARN_MIN_INTERVAL_SECS 300
 
 struct auth_worker_request {
 	unsigned int id;
+	time_t created;
+	const char *data_str;
 	struct auth_request *auth_request;
         auth_worker_callback_t *callback;
 };
@@ -32,9 +36,8 @@
 	struct timeout *to;
 
 	unsigned int id_counter;
-        ARRAY_DEFINE(requests, struct auth_worker_request);
+        struct auth_worker_request *request;
 
-	unsigned int request_count;
 	unsigned int requests_left;
 };
 
@@ -43,22 +46,84 @@
 static unsigned int auth_workers_max;
 static unsigned int auth_workers_max_request_count;
 
+static ARRAY_DEFINE(worker_request_array, struct auth_worker_request *);
+static struct aqueue *worker_request_queue;
+static time_t auth_worker_last_warn;
+
 static char *worker_socket_path;
 
 static void worker_input(struct auth_worker_connection *conn);
-static void auth_worker_destroy(struct auth_worker_connection *conn,
-				const char *reason);
+static void auth_worker_destroy(struct auth_worker_connection **conn,
+				const char *reason, bool restart);
 
 static void auth_worker_idle_timeout(struct auth_worker_connection *conn)
 {
-	i_assert(conn->request_count == 0);
+	i_assert(conn->request == NULL);
 
 	if (idle_count > 1)
-		auth_worker_destroy(conn, NULL);
+		auth_worker_destroy(&conn, NULL, FALSE);
 	else
 		timeout_reset(conn->to);
 }
 
+static void auth_worker_call_timeout(struct auth_worker_connection *conn)
+{
+	i_assert(conn->request != NULL);
+
+	auth_worker_destroy(&conn, "Lookup timed out", TRUE);
+}
+
+static void auth_worker_request_send(struct auth_worker_connection *conn,
+				     struct auth_worker_request *request)
+{
+	struct const_iovec iov[3];
+
+	i_assert(conn->requests_left > 0);
+
+	if (ioloop_time - request->created > AUTH_WORKER_DELAY_WARN_SECS &&
+	    ioloop_time - auth_worker_last_warn >
+	    AUTH_WORKER_DELAY_WARN_MIN_INTERVAL_SECS) {
+		auth_worker_last_warn = ioloop_time;
+		i_warning("auth workers: Auth request was queued for %d "
+			  "seconds, %d left in queue",
+			  (int)(ioloop_time - request->created),
+			  aqueue_count(worker_request_queue));
+	}
+
+	request->id = ++conn->id_counter;
+
+	iov[0].iov_base = t_strdup_printf("%d\t", request->id);
+	iov[0].iov_len = strlen(iov[0].iov_base);
+	iov[1].iov_base = request->data_str;
+	iov[1].iov_len = strlen(request->data_str);
+	iov[2].iov_base = "\n";
+	iov[2].iov_len = 1;
+
+	o_stream_sendv(conn->output, iov, 3);
+
+	conn->request = request;
+	conn->requests_left--;
+
+	timeout_remove(&conn->to);
+	conn->to = timeout_add(AUTH_WORKER_LOOKUP_TIMEOUT_SECS * 1000,
+			       auth_worker_call_timeout, conn);
+	idle_count--;
+}
+
+static void auth_worker_request_send_next(struct auth_worker_connection *conn)
+{
+	struct auth_worker_request *request, *const *requestp;
+
+	if (aqueue_count(worker_request_queue) == 0)
+		return;
+
+	requestp = array_idx(&worker_request_array,
+			     aqueue_idx(worker_request_queue, 0));
+	request = *requestp;
+	aqueue_delete_tail(worker_request_queue);
+	auth_worker_request_send(conn, request);
+}
+
 static struct auth_worker_connection *auth_worker_create(void)
 {
 	struct auth_worker_connection *conn;
@@ -81,14 +146,15 @@
 				worker_socket_path);
 		}
 
-		if (try == 5) {
-			i_fatal("net_connect_unix(%s) "
-				"failed after %d tries: %m",
-				worker_socket_path, try);
+		if (try == 50) {
+			i_error("net_connect_unix(%s) "
+				"failed after %d secs: %m",
+				worker_socket_path, try/10);
+			return NULL;
 		}
 
-		/* not created yet? try again */
-		sleep(1);
+		/* wait and try again */
+		usleep(100000);
 	}
 
 	conn = i_new(struct auth_worker_connection, 1);
@@ -97,7 +163,6 @@
 					 FALSE);
 	conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
 	conn->io = io_add(fd, IO_READ, worker_input, conn);
-	i_array_init(&conn->requests, 16);
 	conn->requests_left = auth_workers_max_request_count;
 	conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000,
 			       auth_worker_idle_timeout, conn);
@@ -108,13 +173,14 @@
 	return conn;
 }
 
-static void auth_worker_destroy(struct auth_worker_connection *conn,
-				const char *reason)
+static void auth_worker_destroy(struct auth_worker_connection **_conn,
+				const char *reason, bool restart)
 {
+	struct auth_worker_connection *conn = *_conn;
 	struct auth_worker_connection **connp;
-	struct auth_worker_request *requests;
 	unsigned int i, count;
-	const char *reply;
+
+	*_conn = NULL;
 
 	connp = array_get_modifiable(&connections, &count);
 	for (i = 0; i < count; i++) {
@@ -124,28 +190,19 @@
 		}
 	}
 
-	if (conn->request_count == 0)
+	if (conn->request == NULL)
 		idle_count--;
 
-	/* abort all pending requests */
-	reply = t_strdup_printf("FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE);
+	if (conn->request != NULL) T_BEGIN {
+		struct auth_request *auth_request = conn->request->auth_request;
 
-	requests = array_get_modifiable(&conn->requests, &count);
-	for (i = 0; i < count; i++) {
-		if (requests[i].id != 0) {
-			auth_request_log_error(requests[i].auth_request,
-					       "worker-server",
-					       "Aborted: %s", reason);
-			T_BEGIN {
-				requests[i].callback(requests[i].auth_request,
-						     reply);
-			} T_END;
-			auth_request_unref(&requests[i].auth_request);
-		}
-	}
+		auth_request_log_error(auth_request, "worker-server",
+				       "Aborted: %s", reason);
+		conn->request->callback(auth_request, t_strdup_printf(
+				"FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE));
+		auth_request_unref(&conn->request->auth_request);
+	} T_END;
 
-
-	array_free(&conn->requests);
 	io_remove(&conn->io);
 	i_stream_destroy(&conn->input);
 	o_stream_destroy(&conn->output);
@@ -154,78 +211,47 @@
 	if (close(conn->fd) < 0)
 		i_error("close(auth worker) failed: %m");
 	i_free(conn);
-}
 
-static struct auth_worker_request *
-auth_worker_request_lookup(struct auth_worker_connection *conn,
-			   unsigned int id)
-{
-	struct auth_worker_request *requests;
-	unsigned int i, count;
-
-	requests = array_get_modifiable(&conn->requests, &count);
-	for (i = 0; i < count; i++) {
-		if (requests[i].id == id)
-			return &requests[i];
+	if (idle_count == 0 && restart) {
+		conn = auth_worker_create();
+		if (conn != NULL)
+			auth_worker_request_send_next(conn);
 	}
-	return NULL;
 }
 
 static struct auth_worker_connection *auth_worker_find_free(void)
 {
-	struct auth_worker_connection **conn, *best;
+	struct auth_worker_connection **conns;
 	unsigned int i, count;
-	size_t outbuf_size, best_size;
 
-	conn = array_get_modifiable(&connections, &count);
-	if (idle_count > 0) {
-		/* there exists at least one idle connection, use it */
-		for (i = 0; i < count; i++) {
-			if (conn[i]->request_count == 0)
-				return conn[i];
-		}
-		i_unreached();
+	if (idle_count == 0)
+		return NULL;
+
+	conns = array_get_modifiable(&connections, &count);
+	for (i = 0; i < count; i++) {
+		if (conns[i]->request == NULL)
+			return conns[i];
 	}
-
-	/* first the connection with least data in output buffer */
-	best = NULL;
-	best_size = (size_t)-1;
-	for (i = 0; i < count; i++) {
-		outbuf_size = o_stream_get_buffer_used_size(conn[i]->output);
-		if (outbuf_size < best_size && conn[i]->requests_left > 0) {
-			best = conn[i];
-			best_size = outbuf_size;
-		}
-	}
-
-	return best;
+	i_unreached();
+	return NULL;
 }
 
-static void auth_worker_handle_request(struct auth_worker_connection *conn,
+static void auth_worker_request_handle(struct auth_worker_connection *conn,
 				       struct auth_worker_request *request,
 				       const char *line)
 {
+	conn->request = NULL;
+	timeout_remove(&conn->to);
+	conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000,
+			       auth_worker_idle_timeout, conn);
+	idle_count++;
+
 	request->callback(request->auth_request, line);
 	auth_request_unref(&request->auth_request);
-
-	/* mark the record empty so it can be used for future requests */
-	memset(request, 0, sizeof(*request));
-
-	/* update counters */
-	conn->request_count--;
-	if (conn->request_count > 0)
-		timeout_reset(conn->to);
-	else {
-		timeout_remove(&conn->to);
-		conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000,
-				       auth_worker_idle_timeout, conn);
-		idle_count++;
-	}
 }
 
 static void worker_input(struct auth_worker_connection *conn)
 {
-	struct auth_worker_request *request;
 	const char *line, *id_str;
 	unsigned int id;
 
@@ -234,13 +260,14 @@
 		return;
 	case -1:
 		/* disconnected */
-		auth_worker_destroy(conn, "Worker process died unexpectedly");
+		auth_worker_destroy(&conn, "Worker process died unexpectedly",
+				    TRUE);
 		return;
 	case -2:
 		/* buffer full */
 		i_error("BUG: Auth worker sent us more than %d bytes",
 			(int)AUTH_WORKER_MAX_LINE_LENGTH);
-		auth_worker_destroy(conn, "Worker is buggy");
+		auth_worker_destroy(&conn, "Worker is buggy", TRUE);
 		return;
 	}
 
@@ -250,37 +277,28 @@
 		if (line == NULL)
 			continue;
 
-		T_BEGIN {
-			id = (unsigned int)strtoul(t_strcut(id_str, '\t'),
-						   NULL, 10);
-			request = auth_worker_request_lookup(conn, id);
-		} T_END;
-
-		if (request != NULL)
-			auth_worker_handle_request(conn, request, line + 1);
+		id = (unsigned int)strtoul(t_strcut(id_str, '\t'),
+					   NULL, 10);
+		if (conn->request != NULL && id == conn->request->id) {
+			auth_worker_request_handle(conn, conn->request,
+						   line + 1);
+		} else {
+			if (conn->request != NULL) {
+				i_error("BUG: Worker sent reply with id %u, "
+					"expected %u", id, conn->request->id);
+			} else {
+				i_error("BUG: Worker sent reply with id %u, "
+					"none was expected", id);
+			}
+			auth_worker_destroy(&conn, "Worker is buggy", TRUE);
+			return;
+		}
 	}
 
-	if (conn->requests_left == 0 && conn->request_count == 0) {
-		auth_worker_destroy(conn, "Max requests limit");
-		if (idle_count == 0)
-			auth_worker_create();
-	}
-}
-
-static struct auth_worker_request *
-auth_worker_request_get(struct auth_worker_connection *conn)
-{
-        struct auth_worker_request *request;
-
-	request = auth_worker_request_lookup(conn, 0);
-	return request != NULL ? request : array_append_space(&conn->requests);
-}
-
-static void auth_worker_call_timeout(struct auth_worker_connection *conn)
-{
-	i_assert(conn->request_count > 0);
-
-	auth_worker_destroy(conn, "Lookup timed out");
+	if (conn->requests_left == 0)
+		auth_worker_destroy(&conn, "Max requests limit", TRUE);
+	else
+		auth_worker_request_send_next(conn);
 }
 
 void auth_worker_call(struct auth_request *auth_request,
@@ -289,66 +307,32 @@
 {
 	struct auth_worker_connection *conn;
 	struct auth_worker_request *request;
-	const char *reply, *data_str;
-	struct const_iovec iov[3];
 
-	conn = auth_worker_find_free();
-	if (conn == NULL) {
-		/* no connections currently. can happen if all have been
-		   idle for last 10 minutes. create a new one. */
-		conn = auth_worker_create();
-		if (conn == NULL) {
-			auth_request_log_error(auth_request, "worker-server",
-				"Couldn't create new auth worker");
-			reply = t_strdup_printf("FAIL\t%d",
-						PASSDB_RESULT_INTERNAL_FAILURE);
-			callback(auth_request, reply);
-			return;
-		}
-	}
-
-	i_assert(conn->requests_left > 0);
-
-	data_str = auth_stream_reply_export(data);
-	iov[0].iov_base = t_strdup_printf("%d\t", ++conn->id_counter);
-	iov[0].iov_len = strlen(iov[0].iov_base);
-	iov[1].iov_base = data_str;
-	iov[1].iov_len = strlen(data_str);
-	iov[2].iov_base = "\n";
-	iov[2].iov_len = 1;
-
-	if (o_stream_get_buffer_used_size(conn->output) +
-	    iov[0].iov_len + iov[1].iov_len + 1 > AUTH_WORKER_MAX_OUTBUF_SIZE) {
-		auth_request_log_error(auth_request, "worker-server",
-				       "All auth workers are busy");
-		reply = t_strdup_printf("FAIL\t%d",
-					PASSDB_RESULT_INTERNAL_FAILURE);
-		callback(auth_request, reply);
-		return;
-	}
-
-	/* find an empty request */
-	request = auth_worker_request_get(conn);
-	request->id = conn->id_counter;
+	request = p_new(auth_request->pool, struct auth_worker_request, 1);
+	request->created = ioloop_time;
+	request->data_str = p_strdup(auth_request->pool,
+				     auth_stream_reply_export(data));
 	request->auth_request = auth_request;
 	request->callback = callback;
 	auth_request_ref(auth_request);
 
-	o_stream_sendv(conn->output, iov, 3);
-
-	if (idle_count == 0) {
-		/* this request was queued, we need new workers */
-		auth_worker_create();
+	if (aqueue_count(worker_request_queue) > 0) {
+		/* requests are already being queued, no chance of
+		   finding/creating a worker */
+		conn = NULL;
+	} else {
+		conn = auth_worker_find_free();
+		if (conn == NULL) {
+			/* no free connections, create a new one */
+			conn = auth_worker_create();
+		}
 	}
-
-	if (conn->request_count == 0) {
-		timeout_remove(&conn->to);
-		conn->to = timeout_add(AUTH_WORKER_LOOKUP_TIMEOUT_SECS * 1000,
-				       auth_worker_call_timeout, conn);
-		idle_count--;
+	if (conn != NULL)
+		auth_worker_request_send(conn, request);
+	else {
+		/* reached the limit, queue the request */
+		aqueue_append(worker_request_queue, &request);
 	}
-	conn->request_count++;
-	conn->requests_left--;
 }
 
 void auth_worker_server_init(void)
@@ -377,22 +361,28 @@
 	if (auth_workers_max_request_count == 0)
 		auth_workers_max_request_count = (unsigned int)-1;
 
+	i_array_init(&worker_request_array, 128);
+	worker_request_queue = aqueue_init(&worker_request_array.arr);
+
 	i_array_init(&connections, 16);
-	auth_worker_create();
+	(void)auth_worker_create();
 }
 
 void auth_worker_server_deinit(void)
 {
-	struct auth_worker_connection **connp;
+	struct auth_worker_connection **connp, *conn;
 
 	if (!array_is_created(&connections))
 		return;
 
 	while (array_count(&connections) > 0) {
 		connp = array_idx_modifiable(&connections, 0);
-		auth_worker_destroy(*connp, "Shutting down");
+		conn = *connp;
+		auth_worker_destroy(&conn, "Shutting down", FALSE);
 	}
 	array_free(&connections);
 
+	aqueue_deinit(&worker_request_queue);
+	array_free(&worker_request_array);
 	i_free(worker_socket_path);
 }