Mercurial > dovecot > original-hg > dovecot-1.2
changeset 8559:b0bc4519332f HEAD
Redesigned auth request queuing to auth worker processes.
Only a single request is now pending for a worker at a time. If a request
was queued for more than 3 seconds, log a warning (but no often than once in
5 minutes).
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Tue, 16 Dec 2008 07:50:44 +0200 |
parents | 94e0fda6802e |
children | b6a7bc10c19a |
files | src/auth/auth-worker-server.c |
diffstat | 1 files changed, 166 insertions(+), 176 deletions(-) [+] |
line wrap: on
line diff
--- a/src/auth/auth-worker-server.c Tue Dec 16 06:08:26 2008 +0200 +++ b/src/auth/auth-worker-server.c Tue Dec 16 07:50:44 2008 +0200 @@ -1,8 +1,9 @@ /* Copyright (c) 2005-2008 Dovecot authors, see the included COPYING file */ #include "common.h" +#include "ioloop.h" #include "array.h" -#include "ioloop.h" +#include "aqueue.h" #include "network.h" #include "istream.h" #include "ostream.h" @@ -13,12 +14,15 @@ #include <stdlib.h> #include <unistd.h> -#define AUTH_WORKER_MAX_OUTBUF_SIZE 10240 #define AUTH_WORKER_LOOKUP_TIMEOUT_SECS 60 #define AUTH_WORKER_MAX_IDLE_SECS (60*30) +#define AUTH_WORKER_DELAY_WARN_SECS 3 +#define AUTH_WORKER_DELAY_WARN_MIN_INTERVAL_SECS 300 struct auth_worker_request { unsigned int id; + time_t created; + const char *data_str; struct auth_request *auth_request; auth_worker_callback_t *callback; }; @@ -32,9 +36,8 @@ struct timeout *to; unsigned int id_counter; - ARRAY_DEFINE(requests, struct auth_worker_request); + struct auth_worker_request *request; - unsigned int request_count; unsigned int requests_left; }; @@ -43,22 +46,84 @@ static unsigned int auth_workers_max; static unsigned int auth_workers_max_request_count; +static ARRAY_DEFINE(worker_request_array, struct auth_worker_request *); +static struct aqueue *worker_request_queue; +static time_t auth_worker_last_warn; + static char *worker_socket_path; static void worker_input(struct auth_worker_connection *conn); -static void auth_worker_destroy(struct auth_worker_connection *conn, - const char *reason); +static void auth_worker_destroy(struct auth_worker_connection **conn, + const char *reason, bool restart); static void auth_worker_idle_timeout(struct auth_worker_connection *conn) { - i_assert(conn->request_count == 0); + i_assert(conn->request == NULL); if (idle_count > 1) - auth_worker_destroy(conn, NULL); + auth_worker_destroy(&conn, NULL, FALSE); else timeout_reset(conn->to); } +static void auth_worker_call_timeout(struct auth_worker_connection *conn) +{ + i_assert(conn->request != NULL); + + auth_worker_destroy(&conn, "Lookup timed out", TRUE); +} + +static void auth_worker_request_send(struct auth_worker_connection *conn, + struct auth_worker_request *request) +{ + struct const_iovec iov[3]; + + i_assert(conn->requests_left > 0); + + if (ioloop_time - request->created > AUTH_WORKER_DELAY_WARN_SECS && + ioloop_time - auth_worker_last_warn > + AUTH_WORKER_DELAY_WARN_MIN_INTERVAL_SECS) { + auth_worker_last_warn = ioloop_time; + i_warning("auth workers: Auth request was queued for %d " + "seconds, %d left in queue", + (int)(ioloop_time - request->created), + aqueue_count(worker_request_queue)); + } + + request->id = ++conn->id_counter; + + iov[0].iov_base = t_strdup_printf("%d\t", request->id); + iov[0].iov_len = strlen(iov[0].iov_base); + iov[1].iov_base = request->data_str; + iov[1].iov_len = strlen(request->data_str); + iov[2].iov_base = "\n"; + iov[2].iov_len = 1; + + o_stream_sendv(conn->output, iov, 3); + + conn->request = request; + conn->requests_left--; + + timeout_remove(&conn->to); + conn->to = timeout_add(AUTH_WORKER_LOOKUP_TIMEOUT_SECS * 1000, + auth_worker_call_timeout, conn); + idle_count--; +} + +static void auth_worker_request_send_next(struct auth_worker_connection *conn) +{ + struct auth_worker_request *request, *const *requestp; + + if (aqueue_count(worker_request_queue) == 0) + return; + + requestp = array_idx(&worker_request_array, + aqueue_idx(worker_request_queue, 0)); + request = *requestp; + aqueue_delete_tail(worker_request_queue); + auth_worker_request_send(conn, request); +} + static struct auth_worker_connection *auth_worker_create(void) { struct auth_worker_connection *conn; @@ -81,14 +146,15 @@ worker_socket_path); } - if (try == 5) { - i_fatal("net_connect_unix(%s) " - "failed after %d tries: %m", - worker_socket_path, try); + if (try == 50) { + i_error("net_connect_unix(%s) " + "failed after %d secs: %m", + worker_socket_path, try/10); + return NULL; } - /* not created yet? try again */ - sleep(1); + /* wait and try again */ + usleep(100000); } conn = i_new(struct auth_worker_connection, 1); @@ -97,7 +163,6 @@ FALSE); conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE); conn->io = io_add(fd, IO_READ, worker_input, conn); - i_array_init(&conn->requests, 16); conn->requests_left = auth_workers_max_request_count; conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000, auth_worker_idle_timeout, conn); @@ -108,13 +173,14 @@ return conn; } -static void auth_worker_destroy(struct auth_worker_connection *conn, - const char *reason) +static void auth_worker_destroy(struct auth_worker_connection **_conn, + const char *reason, bool restart) { + struct auth_worker_connection *conn = *_conn; struct auth_worker_connection **connp; - struct auth_worker_request *requests; unsigned int i, count; - const char *reply; + + *_conn = NULL; connp = array_get_modifiable(&connections, &count); for (i = 0; i < count; i++) { @@ -124,28 +190,19 @@ } } - if (conn->request_count == 0) + if (conn->request == NULL) idle_count--; - /* abort all pending requests */ - reply = t_strdup_printf("FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE); + if (conn->request != NULL) T_BEGIN { + struct auth_request *auth_request = conn->request->auth_request; - requests = array_get_modifiable(&conn->requests, &count); - for (i = 0; i < count; i++) { - if (requests[i].id != 0) { - auth_request_log_error(requests[i].auth_request, - "worker-server", - "Aborted: %s", reason); - T_BEGIN { - requests[i].callback(requests[i].auth_request, - reply); - } T_END; - auth_request_unref(&requests[i].auth_request); - } - } + auth_request_log_error(auth_request, "worker-server", + "Aborted: %s", reason); + conn->request->callback(auth_request, t_strdup_printf( + "FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE)); + auth_request_unref(&conn->request->auth_request); + } T_END; - - array_free(&conn->requests); io_remove(&conn->io); i_stream_destroy(&conn->input); o_stream_destroy(&conn->output); @@ -154,78 +211,47 @@ if (close(conn->fd) < 0) i_error("close(auth worker) failed: %m"); i_free(conn); -} -static struct auth_worker_request * -auth_worker_request_lookup(struct auth_worker_connection *conn, - unsigned int id) -{ - struct auth_worker_request *requests; - unsigned int i, count; - - requests = array_get_modifiable(&conn->requests, &count); - for (i = 0; i < count; i++) { - if (requests[i].id == id) - return &requests[i]; + if (idle_count == 0 && restart) { + conn = auth_worker_create(); + if (conn != NULL) + auth_worker_request_send_next(conn); } - return NULL; } static struct auth_worker_connection *auth_worker_find_free(void) { - struct auth_worker_connection **conn, *best; + struct auth_worker_connection **conns; unsigned int i, count; - size_t outbuf_size, best_size; - conn = array_get_modifiable(&connections, &count); - if (idle_count > 0) { - /* there exists at least one idle connection, use it */ - for (i = 0; i < count; i++) { - if (conn[i]->request_count == 0) - return conn[i]; - } - i_unreached(); + if (idle_count == 0) + return NULL; + + conns = array_get_modifiable(&connections, &count); + for (i = 0; i < count; i++) { + if (conns[i]->request == NULL) + return conns[i]; } - - /* first the connection with least data in output buffer */ - best = NULL; - best_size = (size_t)-1; - for (i = 0; i < count; i++) { - outbuf_size = o_stream_get_buffer_used_size(conn[i]->output); - if (outbuf_size < best_size && conn[i]->requests_left > 0) { - best = conn[i]; - best_size = outbuf_size; - } - } - - return best; + i_unreached(); + return NULL; } -static void auth_worker_handle_request(struct auth_worker_connection *conn, +static void auth_worker_request_handle(struct auth_worker_connection *conn, struct auth_worker_request *request, const char *line) { + conn->request = NULL; + timeout_remove(&conn->to); + conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000, + auth_worker_idle_timeout, conn); + idle_count++; + request->callback(request->auth_request, line); auth_request_unref(&request->auth_request); - - /* mark the record empty so it can be used for future requests */ - memset(request, 0, sizeof(*request)); - - /* update counters */ - conn->request_count--; - if (conn->request_count > 0) - timeout_reset(conn->to); - else { - timeout_remove(&conn->to); - conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000, - auth_worker_idle_timeout, conn); - idle_count++; - } } static void worker_input(struct auth_worker_connection *conn) { - struct auth_worker_request *request; const char *line, *id_str; unsigned int id; @@ -234,13 +260,14 @@ return; case -1: /* disconnected */ - auth_worker_destroy(conn, "Worker process died unexpectedly"); + auth_worker_destroy(&conn, "Worker process died unexpectedly", + TRUE); return; case -2: /* buffer full */ i_error("BUG: Auth worker sent us more than %d bytes", (int)AUTH_WORKER_MAX_LINE_LENGTH); - auth_worker_destroy(conn, "Worker is buggy"); + auth_worker_destroy(&conn, "Worker is buggy", TRUE); return; } @@ -250,37 +277,28 @@ if (line == NULL) continue; - T_BEGIN { - id = (unsigned int)strtoul(t_strcut(id_str, '\t'), - NULL, 10); - request = auth_worker_request_lookup(conn, id); - } T_END; - - if (request != NULL) - auth_worker_handle_request(conn, request, line + 1); + id = (unsigned int)strtoul(t_strcut(id_str, '\t'), + NULL, 10); + if (conn->request != NULL && id == conn->request->id) { + auth_worker_request_handle(conn, conn->request, + line + 1); + } else { + if (conn->request != NULL) { + i_error("BUG: Worker sent reply with id %u, " + "expected %u", id, conn->request->id); + } else { + i_error("BUG: Worker sent reply with id %u, " + "none was expected", id); + } + auth_worker_destroy(&conn, "Worker is buggy", TRUE); + return; + } } - if (conn->requests_left == 0 && conn->request_count == 0) { - auth_worker_destroy(conn, "Max requests limit"); - if (idle_count == 0) - auth_worker_create(); - } -} - -static struct auth_worker_request * -auth_worker_request_get(struct auth_worker_connection *conn) -{ - struct auth_worker_request *request; - - request = auth_worker_request_lookup(conn, 0); - return request != NULL ? request : array_append_space(&conn->requests); -} - -static void auth_worker_call_timeout(struct auth_worker_connection *conn) -{ - i_assert(conn->request_count > 0); - - auth_worker_destroy(conn, "Lookup timed out"); + if (conn->requests_left == 0) + auth_worker_destroy(&conn, "Max requests limit", TRUE); + else + auth_worker_request_send_next(conn); } void auth_worker_call(struct auth_request *auth_request, @@ -289,66 +307,32 @@ { struct auth_worker_connection *conn; struct auth_worker_request *request; - const char *reply, *data_str; - struct const_iovec iov[3]; - conn = auth_worker_find_free(); - if (conn == NULL) { - /* no connections currently. can happen if all have been - idle for last 10 minutes. create a new one. */ - conn = auth_worker_create(); - if (conn == NULL) { - auth_request_log_error(auth_request, "worker-server", - "Couldn't create new auth worker"); - reply = t_strdup_printf("FAIL\t%d", - PASSDB_RESULT_INTERNAL_FAILURE); - callback(auth_request, reply); - return; - } - } - - i_assert(conn->requests_left > 0); - - data_str = auth_stream_reply_export(data); - iov[0].iov_base = t_strdup_printf("%d\t", ++conn->id_counter); - iov[0].iov_len = strlen(iov[0].iov_base); - iov[1].iov_base = data_str; - iov[1].iov_len = strlen(data_str); - iov[2].iov_base = "\n"; - iov[2].iov_len = 1; - - if (o_stream_get_buffer_used_size(conn->output) + - iov[0].iov_len + iov[1].iov_len + 1 > AUTH_WORKER_MAX_OUTBUF_SIZE) { - auth_request_log_error(auth_request, "worker-server", - "All auth workers are busy"); - reply = t_strdup_printf("FAIL\t%d", - PASSDB_RESULT_INTERNAL_FAILURE); - callback(auth_request, reply); - return; - } - - /* find an empty request */ - request = auth_worker_request_get(conn); - request->id = conn->id_counter; + request = p_new(auth_request->pool, struct auth_worker_request, 1); + request->created = ioloop_time; + request->data_str = p_strdup(auth_request->pool, + auth_stream_reply_export(data)); request->auth_request = auth_request; request->callback = callback; auth_request_ref(auth_request); - o_stream_sendv(conn->output, iov, 3); - - if (idle_count == 0) { - /* this request was queued, we need new workers */ - auth_worker_create(); + if (aqueue_count(worker_request_queue) > 0) { + /* requests are already being queued, no chance of + finding/creating a worker */ + conn = NULL; + } else { + conn = auth_worker_find_free(); + if (conn == NULL) { + /* no free connections, create a new one */ + conn = auth_worker_create(); + } } - - if (conn->request_count == 0) { - timeout_remove(&conn->to); - conn->to = timeout_add(AUTH_WORKER_LOOKUP_TIMEOUT_SECS * 1000, - auth_worker_call_timeout, conn); - idle_count--; + if (conn != NULL) + auth_worker_request_send(conn, request); + else { + /* reached the limit, queue the request */ + aqueue_append(worker_request_queue, &request); } - conn->request_count++; - conn->requests_left--; } void auth_worker_server_init(void) @@ -377,22 +361,28 @@ if (auth_workers_max_request_count == 0) auth_workers_max_request_count = (unsigned int)-1; + i_array_init(&worker_request_array, 128); + worker_request_queue = aqueue_init(&worker_request_array.arr); + i_array_init(&connections, 16); - auth_worker_create(); + (void)auth_worker_create(); } void auth_worker_server_deinit(void) { - struct auth_worker_connection **connp; + struct auth_worker_connection **connp, *conn; if (!array_is_created(&connections)) return; while (array_count(&connections) > 0) { connp = array_idx_modifiable(&connections, 0); - auth_worker_destroy(*connp, "Shutting down"); + conn = *connp; + auth_worker_destroy(&conn, "Shutting down", FALSE); } array_free(&connections); + aqueue_deinit(&worker_request_queue); + array_free(&worker_request_array); i_free(worker_socket_path); }