Mercurial > dovecot > original-hg > dovecot-1.2
changeset 3168:62f8366cb89c HEAD
Forgot to add for blocking passdb/userdb workers..
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Wed, 02 Mar 2005 22:46:25 +0200 |
parents | 97f53e0cce63 |
children | ec17099a6490 |
files | src/auth/auth-worker-client.c src/auth/auth-worker-client.h src/auth/auth-worker-server.c src/auth/auth-worker-server.h src/auth/passdb-blocking.c src/auth/passdb-blocking.h src/auth/userdb-blocking.c src/auth/userdb-blocking.h |
diffstat | 8 files changed, 885 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/auth/auth-worker-client.c Wed Mar 02 22:46:25 2005 +0200 @@ -0,0 +1,329 @@ +/* Copyright (C) 2005 Timo Sirainen */ + +#include "common.h" +#include "ioloop.h" +#include "network.h" +#include "istream.h" +#include "ostream.h" +#include "str.h" +#include "auth-request.h" +#include "auth-worker-client.h" + +#include <stdlib.h> + +#define OUTBUF_THROTTLE_SIZE (1024*10) + +struct auth_worker_client { + int refcount; + + struct auth *auth; + int fd; + struct io *io; + struct istream *input; + struct ostream *output; +}; + +static void auth_worker_client_unref(struct auth_worker_client *client); + +static void +auth_worker_client_check_throttle(struct auth_worker_client *client) +{ + if (o_stream_get_buffer_used_size(client->output) >= + OUTBUF_THROTTLE_SIZE) { + /* stop reading new requests until client has read the pending + replies. */ + if (client->io != NULL) { + io_remove(client->io); + client->io = NULL; + } + } +} + +static struct auth_request * +worker_auth_request_new(struct auth_worker_client *client, unsigned int id, + const char *args) +{ + struct auth_request *auth_request; + const char *key, *value, *const *tmp; + pool_t pool; + + pool = pool_alloconly_create("auth_request", 256); + auth_request = p_new(pool, struct auth_request, 1); + auth_request->pool = pool; + + auth_request->refcount = 1; + auth_request->created = ioloop_time; + auth_request->auth = client->auth; + + client->refcount++; + auth_request->context = client; + auth_request->id = id; + + t_push(); + for (tmp = t_strsplit(args, "\t"); *tmp != NULL; tmp++) { + value = strchr(*tmp, '='); + if (value == NULL) + continue; + + key = t_strdup_until(*tmp, value); + value++; + + if (strcmp(key, "user") == 0) + auth_request->user = p_strdup(pool, value); + else if (strcmp(key, "service") == 0) + auth_request->service = p_strdup(pool, value); + else if (strcmp(key, "lip") == 0) + net_addr2ip(value, &auth_request->local_ip); + else if (strcmp(key, "rip") == 0) + net_addr2ip(value, &auth_request->remote_ip); + } + t_pop(); + + return auth_request; +} + +static void verify_plain_callback(enum passdb_result result, + struct auth_request *request) +{ + struct auth_worker_client *client = request->context; + string_t *str; + + str = t_str_new(64); + str_printfa(str, "%u\t", request->id); + + if (result != PASSDB_RESULT_OK) + str_printfa(str, "FAIL\t%d", result); + else { + str_append(str, "OK\t"); + if (request->passdb_password != NULL) + str_append(str, request->passdb_password); + str_append_c(str, '\t'); + if (request->extra_fields != NULL) + str_append_str(str, request->extra_fields); + } + str_append_c(str, '\n'); + + o_stream_send(client->output, str_data(str), str_len(str)); + auth_worker_client_check_throttle(client); + auth_worker_client_unref(client); +} + +static void +auth_worker_handle_passv(struct auth_worker_client *client, + unsigned int id, const char *args) +{ + /* verify plaintext password */ + struct auth_request *auth_request; + const char *password; + + password = t_strcut(args, '\t'); + args = strchr(args, '\t'); + if (args != NULL) args++; + + auth_request = worker_auth_request_new(client, id, args); + auth_request->mech_password = + p_strdup(auth_request->pool, password); + + client->auth->passdb->verify_plain(auth_request, password, + verify_plain_callback); +} + +static void +lookup_credentials_callback(enum passdb_result result, const char *credentials, + struct auth_request *request) +{ + struct auth_worker_client *client = request->context; + string_t *str; + + str = t_str_new(64); + str_printfa(str, "%u\t", request->id); + + if (result != PASSDB_RESULT_OK) + str_printfa(str, "FAIL\t%d", result); + else { + str_append(str, "OK\t"); + str_append(str, credentials); + str_append_c(str, '\t'); + if (request->extra_fields != NULL) + str_append_str(str, request->extra_fields); + } + str_append_c(str, '\n'); + + o_stream_send(client->output, str_data(str), str_len(str)); + auth_worker_client_check_throttle(client); + auth_worker_client_unref(client); +} + +static void +auth_worker_handle_passl(struct auth_worker_client *client, + unsigned int id, const char *args) +{ + /* lookup credentials */ + struct auth_request *auth_request; + const char *credentials_str; + enum passdb_credentials credentials; + + credentials_str = t_strcut(args, '\t'); + args = strchr(args, '\t'); + if (args != NULL) args++; + + credentials = atoi(credentials_str); + + auth_request = worker_auth_request_new(client, id, args); + client->auth->passdb->lookup_credentials(auth_request, credentials, + lookup_credentials_callback); +} + +static void +lookup_user_callback(const char *result, struct auth_request *auth_request) +{ + struct auth_worker_client *client = auth_request->context; + string_t *str; + + str = t_str_new(64); + str_printfa(str, "%u\t", auth_request->id); + str_append(str, result); + str_append_c(str, '\n'); + + o_stream_send(client->output, str_data(str), str_len(str)); + auth_worker_client_check_throttle(client); + auth_worker_client_unref(client); +} + +static void +auth_worker_handle_user(struct auth_worker_client *client, + unsigned int id, const char *args) +{ + /* lookup user */ + struct auth_request *auth_request; + + auth_request = worker_auth_request_new(client, id, args); + client->auth->userdb->lookup(auth_request, lookup_user_callback); +} + +static int +auth_worker_handle_line(struct auth_worker_client *client, const char *line) +{ + const char *p; + unsigned int id; + + p = strchr(line, '\t'); + if (p == NULL) + return FALSE; + + id = (unsigned int)strtoul(t_strdup_until(line, p), NULL, 10); + line = p + 1; + + if (strncmp(line, "PASSV\t", 6) == 0) + auth_worker_handle_passv(client, id, line + 6); + else if (strncmp(line, "PASSL\t", 6) == 0) + auth_worker_handle_passl(client, id, line + 6); + else if (strncmp(line, "USER\t", 5) == 0) + auth_worker_handle_user(client, id, line + 5); + + return TRUE; +} + +static void auth_worker_input(void *context) +{ + struct auth_worker_client *client = context; + char *line; + int ret; + + switch (i_stream_read(client->input)) { + case 0: + return; + case -1: + /* disconnected */ + auth_worker_client_destroy(client); + return; + case -2: + /* buffer full */ + i_error("BUG: Auth worker server sent us more than %d bytes", + (int)AUTH_WORKER_MAX_LINE_LENGTH); + auth_worker_client_destroy(client); + return; + } + + client->refcount++; + while ((line = i_stream_next_line(client->input)) != NULL) { + t_push(); + ret = auth_worker_handle_line(client, line); + t_pop(); + + if (!ret) { + auth_worker_client_destroy(client); + break; + } + } + auth_worker_client_unref(client); +} + +static int auth_worker_output(void *context) +{ + struct auth_worker_client *client = context; + + if (o_stream_flush(client->output) < 0) { + auth_worker_client_destroy(client); + return 1; + } + + if (o_stream_get_buffer_used_size(client->output) <= + OUTBUF_THROTTLE_SIZE/3 && client->io == NULL) { + /* allow input again */ + client->io = io_add(client->fd, IO_READ, + auth_worker_input, client); + } + return 1; +} + +struct auth_worker_client * +auth_worker_client_create(struct auth *auth, int fd) +{ + struct auth_worker_client *client; + + client = i_new(struct auth_worker_client, 1); + client->refcount = 1; + + client->auth = auth; + client->fd = fd; + client->input = + i_stream_create_file(fd, default_pool, + AUTH_WORKER_MAX_LINE_LENGTH, FALSE); + client->output = + o_stream_create_file(fd, default_pool, (size_t)-1, FALSE); + o_stream_set_flush_callback(client->output, auth_worker_output, client); + client->io = io_add(fd, IO_READ, auth_worker_input, client); + + return client; +} + +void auth_worker_client_destroy(struct auth_worker_client *client) +{ + if (client->fd == -1) + return; + + i_stream_close(client->input); + o_stream_close(client->output); + + if (client->io != NULL) { + io_remove(client->io); + client->io = NULL; + } + + net_disconnect(client->fd); + client->fd = -1; + + io_loop_stop(ioloop); + auth_worker_client_unref(client); +} + +static void auth_worker_client_unref(struct auth_worker_client *client) +{ + if (--client->refcount > 0) + return; + + i_stream_unref(client->input); + o_stream_unref(client->output); + i_free(client); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/auth/auth-worker-client.h Wed Mar 02 22:46:25 2005 +0200 @@ -0,0 +1,9 @@ +#ifndef __AUTH_WORKER_CLIENT_H +#define __AUTH_WORKER_CLIENT_H + +#define AUTH_WORKER_MAX_LINE_LENGTH 1024 + +struct auth_worker_client *auth_worker_client_create(struct auth *auth, int fd); +void auth_worker_client_destroy(struct auth_worker_client *client); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/auth/auth-worker-server.c Wed Mar 02 22:46:25 2005 +0200 @@ -0,0 +1,364 @@ +/* Copyright (C) 2005 Timo Sirainen */ + +#include "common.h" +#include "buffer.h" +#include "ioloop.h" +#include "network.h" +#include "istream.h" +#include "ostream.h" +#include "auth-request.h" +#include "auth-worker-client.h" +#include "auth-worker-server.h" + +#include <stdlib.h> + +#define AUTH_WORKER_MAX_OUTBUF_SIZE 10240 +#define AUTH_WORKER_MAX_IDLE_TIME (60*30) + +struct auth_worker_request { + unsigned int id; + struct auth_request *auth_request; + auth_worker_callback_t *callback; +}; + +struct auth_worker_connection { + int fd; + + struct io *io; + struct istream *input; + struct ostream *output; + + unsigned int id_counter; + buffer_t *requests; /* struct auth_worker_request[] */ + + time_t last_used; + unsigned int request_count; +}; + +static buffer_t *connections = NULL; +static unsigned int idle_count; +static unsigned int auth_workers_max; + +static char *worker_socket_path; +static struct timeout *to; + +static void worker_input(void *context); + +static struct auth_worker_connection *auth_worker_create(void) +{ + struct auth_worker_connection *conn; + int fd; + + if (connections->used / sizeof(conn) >= auth_workers_max) + return NULL; + + fd = net_connect_unix(worker_socket_path); + if (fd < 0) { + if (errno != EAGAIN) { + i_fatal("net_connect_unix(%s) failed: %m", + worker_socket_path); + } + /* busy */ + return NULL; + } + + conn = i_new(struct auth_worker_connection, 1); + conn->fd = fd; + conn->input = i_stream_create_file(fd, default_pool, + AUTH_WORKER_MAX_LINE_LENGTH, FALSE); + conn->output = + o_stream_create_file(fd, default_pool, (size_t)-1, FALSE); + conn->io = io_add(fd, IO_READ, worker_input, conn); + conn->requests = buffer_create_dynamic(default_pool, 128); + + idle_count++; + + buffer_append(connections, &conn, sizeof(conn)); + return conn; +} + +static void auth_worker_destroy(struct auth_worker_connection *conn) +{ + struct auth_worker_connection **connp; + struct auth_worker_request *request; + size_t i, size; + const char *reply; + + connp = buffer_get_modifyable_data(connections, &size); + size /= sizeof(*connp); + + for (i = 0; i < size; i++) { + if (connp[i] == conn) { + buffer_delete(connections, i * sizeof(*connp), + sizeof(*connp)); + break; + } + } + + if (conn->request_count == 0) + idle_count--; + + /* abort all pending requests */ + request = buffer_get_modifyable_data(conn->requests, &size); + size /= sizeof(*request); + + reply = t_strdup_printf("FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE); + for (i = 0; i < size; i++) { + if (request[i].id != 0) { + request[i].callback(request[i].auth_request, reply); + auth_request_unref(request[i].auth_request); + } + } + + buffer_free(conn->requests); + io_remove(conn->io); + i_stream_unref(conn->input); + o_stream_unref(conn->output); + i_free(conn); +} + +static struct auth_worker_request * +auth_worker_request_lookup(struct auth_worker_connection *conn, + unsigned int id) +{ + struct auth_worker_request *request; + size_t i, size; + + request = buffer_get_modifyable_data(conn->requests, &size); + size /= sizeof(*request); + + for (i = 0; i < size; i++) { + if (request[i].id == id) + return &request[i]; + } + + return NULL; +} + +static struct auth_worker_connection *auth_worker_find_free(void) +{ + struct auth_worker_connection **conn, *best; + size_t i, size, outbuf_size, best_size; + + conn = buffer_get_modifyable_data(connections, &size); + size /= sizeof(*conn); + + if (idle_count > 0) { + /* there exists at least one idle connection, use it */ + for (i = 0; i < size; i++) { + if (conn[i]->request_count == 0) + return conn[i]; + } + i_unreached(); + } + + /* first the connection with least data in output buffer */ + best = NULL; + best_size = (size_t)-1; + for (i = 0; i < size; i++) { + outbuf_size = o_stream_get_buffer_used_size(conn[i]->output); + if (outbuf_size < best_size) { + best = conn[i]; + best_size = outbuf_size; + } + } + + return best; +} + +static void auth_worker_handle_request(struct auth_worker_connection *conn, + struct auth_worker_request *request, + const char *line) +{ + request->callback(request->auth_request, line); + auth_request_unref(request->auth_request); + + /* mark the record empty so it can be used for future requests */ + memset(request, 0, sizeof(*request)); + + /* update counters */ + conn->request_count--; + if (conn->request_count == 0) + idle_count++; +} + +static void worker_input(void *context) +{ + struct auth_worker_connection *conn = context; + struct auth_worker_request *request; + const char *line, *id_str; + unsigned int id; + + switch (i_stream_read(conn->input)) { + case 0: + return; + case -1: + /* disconnected */ + auth_worker_destroy(conn); + return; + case -2: + /* buffer full */ + i_error("BUG: Auth worker sent us more than %d bytes", + (int)AUTH_WORKER_MAX_LINE_LENGTH); + auth_worker_destroy(conn); + return; + } + + while ((line = i_stream_next_line(conn->input)) != NULL) { + id_str = line; + line = strchr(line, '\t'); + if (line == NULL) + continue; + + t_push(); + id = (unsigned int)strtoul(t_strcut(id_str, '\t'), NULL, 10); + request = auth_worker_request_lookup(conn, id); + t_pop(); + + if (request != NULL) + auth_worker_handle_request(conn, request, line + 1); + } +} + +static struct auth_worker_request * +auth_worker_request_get(struct auth_worker_connection *conn) +{ + struct auth_worker_request *request; + size_t i, size; + + request = buffer_get_modifyable_data(conn->requests, &size); + size /= sizeof(*request); + + for (i = 0; i < size; i++) { + if (request[i].id == 0) + return &request[i]; + } + + return buffer_append_space_unsafe(conn->requests, sizeof(*request)); +} + +void auth_worker_call(struct auth_request *auth_request, const char *data, + auth_worker_callback_t *callback) +{ + struct auth_worker_connection *conn; + struct auth_worker_request *request; + const char *reply; + struct const_iovec iov[3]; + + conn = auth_worker_find_free(); + if (conn == NULL) { + /* no connections currently. shouldn't happen unless they + all just crashed.. */ + auth_request_log_error(auth_request, "worker-server", + "All auth workers have died"); + reply = t_strdup_printf("FAIL\t%d", + PASSDB_RESULT_INTERNAL_FAILURE); + callback(auth_request, reply); + + auth_worker_create(); + return; + } + + iov[0].iov_base = t_strdup_printf("%d\t", ++conn->id_counter); + iov[0].iov_len = strlen(iov[0].iov_base); + iov[1].iov_base = data; + iov[1].iov_len = strlen(data); + iov[2].iov_base = "\n"; + iov[2].iov_len = 1; + + if (o_stream_get_buffer_used_size(conn->output) + + iov[0].iov_len + iov[1].iov_len + 1 > AUTH_WORKER_MAX_OUTBUF_SIZE) { + auth_request_log_error(auth_request, "worker-server", + "All auth workers are busy"); + reply = t_strdup_printf("FAIL\t%d", + PASSDB_RESULT_INTERNAL_FAILURE); + callback(auth_request, reply); + return; + } + + /* find an empty request */ + request = auth_worker_request_get(conn); + request->id = conn->id_counter; + request->auth_request = auth_request; + request->callback = callback; + auth_request_ref(auth_request); + + o_stream_sendv(conn->output, iov, 3); + + if (idle_count == 0) { + /* this request was queued, we need new workers */ + auth_worker_create(); + } + + conn->last_used = ioloop_time; + if (conn->request_count++ == 0) + idle_count--; +} + +static void auth_worker_timeout(void *context __attr_unused__) +{ + struct auth_worker_connection **conn; + size_t i, size; + + conn = buffer_get_modifyable_data(connections, &size); + size /= sizeof(*conn); + + if (idle_count <= 1) + return; + + /* remove connections which we haven't used for a long time. + this works because auth_worker_find_free() always returns the + first free connection. */ + for (i = 0; i < size; i++) { + if (conn[i]->last_used + + AUTH_WORKER_MAX_IDLE_TIME < ioloop_time) { + /* remove just one. easier.. */ + auth_worker_destroy(conn[i]); + break; + } + } +} + +void auth_worker_server_init(void) +{ + const char *env; + + if (connections != NULL) { + /* already initialized */ + return; + } + + env = getenv("AUTH_WORKER_PATH"); + if (env == NULL) + i_fatal("AUTH_WORKER_PATH environment not set"); + worker_socket_path = i_strdup(env); + + env = getenv("AUTH_WORKER_MAX_COUNT"); + if (env == NULL) + i_fatal("AUTH_WORKER_MAX_COUNT environment not set"); + auth_workers_max = atoi(env); + + connections = buffer_create_dynamic(default_pool, + sizeof(struct auth_worker_connection) * 16); + to = timeout_add(1000 * 60, auth_worker_timeout, NULL); + + auth_worker_create(); +} + +void auth_worker_server_deinit(void) +{ + struct auth_worker_connection **connp; + + if (connections == NULL) + return; + + while (connections->used > 0) { + connp = buffer_get_modifyable_data(connections, NULL); + auth_worker_destroy(*connp); + } + buffer_free(connections); + connections = NULL; + + timeout_remove(to); + i_free(worker_socket_path); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/auth/auth-worker-server.h Wed Mar 02 22:46:25 2005 +0200 @@ -0,0 +1,15 @@ +#ifndef __AUTH_WORKER_SERVER_H +#define __AUTH_WORKER_SERVER_H + +struct auth_request; + +typedef void auth_worker_callback_t(struct auth_request *request, + const char *reply); + +void auth_worker_call(struct auth_request *auth_request, const char *data, + auth_worker_callback_t *callback); + +void auth_worker_server_init(void); +void auth_worker_server_deinit(void); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/auth/passdb-blocking.c Wed Mar 02 22:46:25 2005 +0200 @@ -0,0 +1,126 @@ +/* Copyright (C) 2005 Timo Sirainen */ + +#include "common.h" +#include "str.h" +#include "auth-worker-server.h" +#include "password-scheme.h" +#include "passdb.h" +#include "passdb-blocking.h" + +#include <stdlib.h> + +static enum passdb_result +check_failure(struct auth_request *request, const char **reply) +{ + /* OK / FAIL */ + if (strncmp(*reply, "OK\t", 3) == 0) { + *reply += 3; + return PASSDB_RESULT_OK; + } + + /* FAIL \t result */ + if (strncmp(*reply, "FAIL\t", 5) != 0) { + auth_request_log_error(request, "blocking", + "Received unknown reply from worker: %s", *reply); + return PASSDB_RESULT_INTERNAL_FAILURE; + } else { + return atoi(*reply + 5); + } +} + +static int get_pass_reply(struct auth_request *request, const char *reply, + const char **password_r, const char **scheme_r) +{ + const char *p; + + p = strchr(reply, '\t'); + if (p == NULL) { + *password_r = NULL; + *scheme_r = NULL; + return 0; + } + + *password_r = t_strdup_until(reply, p); + reply = p + 1; + + if (**password_r == '\0') { + *password_r = NULL; + *scheme_r = NULL; + } else { + request->passdb_password = + p_strdup(request->pool, *password_r); + + *scheme_r = password_get_scheme(password_r); + if (*scheme_r == NULL) { + auth_request_log_error(request, "blocking", + "Received reply from worker without " + "password scheme"); + return -1; + } + } + + if (*reply != '\0') { + i_assert(request->extra_fields == NULL); + + request->extra_fields = str_new(request->pool, 128); + str_append(request->extra_fields, reply); + } + return 0; +} + +static void +verify_plain_callback(struct auth_request *request, const char *reply) +{ + enum passdb_result result; + const char *password, *scheme; + + result = check_failure(request, &reply); + if (result >= 0) { + if (get_pass_reply(request, reply, &password, &scheme) < 0) + result = PASSDB_RESULT_INTERNAL_FAILURE; + } + + auth_request_verify_plain_callback(result, request); +} + +void passdb_blocking_verify_plain(struct auth_request *request) +{ + string_t *str; + + str = t_str_new(64); + str_append(str, "PASSV\t"); + str_append(str, request->mech_password); + str_append_c(str, '\t'); + auth_request_export(request, str); + + auth_worker_call(request, str_c(str), verify_plain_callback); +} + +static void +lookup_credentials_callback(struct auth_request *request, const char *reply) +{ + enum passdb_result result; + const char *password, *scheme; + + result = check_failure(request, &reply); + if (result >= 0) { + if (get_pass_reply(request, reply, &password, &scheme) < 0) + result = PASSDB_RESULT_INTERNAL_FAILURE; + } + + passdb_handle_credentials(result, request->credentials, + password, scheme, + auth_request_lookup_credentials_callback, + request); +} + +void passdb_blocking_lookup_credentials(struct auth_request *request) +{ + string_t *str; + + str = t_str_new(64); + str_printfa(str, "PASSL\t%d\t", request->credentials); + auth_request_export(request, str); + + auth_worker_call(request, str_c(str), lookup_credentials_callback); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/auth/passdb-blocking.h Wed Mar 02 22:46:25 2005 +0200 @@ -0,0 +1,7 @@ +#ifndef __PASSDB_BLOCKING_H +#define __PASSDB_BLOCKING_H + +void passdb_blocking_verify_plain(struct auth_request *request); +void passdb_blocking_lookup_credentials(struct auth_request *request); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/auth/userdb-blocking.c Wed Mar 02 22:46:25 2005 +0200 @@ -0,0 +1,28 @@ +/* Copyright (C) 2005 Timo Sirainen */ + +#include "common.h" +#include "str.h" +#include "auth-worker-server.h" +#include "userdb.h" +#include "userdb-blocking.h" + +#include <stdlib.h> + +static void user_callback(struct auth_request *request, const char *reply) +{ + request->private_callback.userdb(reply, request); +} + +void userdb_blocking_lookup(struct auth_request *request, + userdb_callback_t *callback) +{ + string_t *str; + + request->private_callback.userdb = callback; + + str = t_str_new(64); + str_append(str, "USER\t"); + auth_request_export(request, str); + + auth_worker_call(request, str_c(str), user_callback); +}