Mercurial > dovecot > core-2.2
changeset 13084:0faaceb2f83c
Added "indexer" service, which uses worker processes to perform queued mailbox indexing.
Only a single worker process will index the same user at the same time. This
avoids lock waits, especially when doing full text search indexing with
backends that require locking.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Mon, 27 Jun 2011 23:02:40 +0300 |
parents | e07d2e37053d |
children | 683751c6357e |
files | .hgignore configure.in src/Makefile.am src/indexer/Makefile.am src/indexer/indexer-client.c src/indexer/indexer-client.h src/indexer/indexer-queue.c src/indexer/indexer-queue.h src/indexer/indexer-settings.c src/indexer/indexer-worker-settings.c src/indexer/indexer-worker.c src/indexer/indexer.c src/indexer/indexer.h src/indexer/master-connection.c src/indexer/master-connection.h src/indexer/worker-connection.c src/indexer/worker-connection.h src/indexer/worker-pool.c src/indexer/worker-pool.h |
diffstat | 19 files changed, 1465 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/.hgignore Mon Jun 27 22:58:49 2011 +0300 +++ b/.hgignore Mon Jun 27 23:02:40 2011 +0300 @@ -69,6 +69,8 @@ src/dsync/dsync src/imap-login/imap-login src/imap/imap +src/indexer/indexer +src/indexer/indexer-worker src/ipc/ipc src/lib/unicodemap.c src/lib/UnicodeData.txt
--- a/configure.in Mon Jun 27 22:58:49 2011 +0300 +++ b/configure.in Mon Jun 27 23:02:40 2011 +0300 @@ -2704,6 +2704,7 @@ src/dict/Makefile src/director/Makefile src/dns/Makefile +src/indexer/Makefile src/ipc/Makefile src/imap/Makefile src/imap-login/Makefile
--- a/src/Makefile.am Mon Jun 27 22:58:49 2011 +0300 +++ b/src/Makefile.am Mon Jun 27 23:02:40 2011 +0300 @@ -25,6 +25,7 @@ auth \ dict \ dns \ + indexer \ ipc \ master \ login-common \
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/Makefile.am Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,37 @@ +pkglibexecdir = $(libexecdir)/dovecot + +pkglibexec_PROGRAMS = indexer indexer-worker + +AM_CPPFLAGS = \ + -I$(top_srcdir)/src/lib \ + -I$(top_srcdir)/src/lib-master \ + -I$(top_srcdir)/src/lib-settings \ + -I$(top_srcdir)/src/lib-mail \ + -I$(top_srcdir)/src/lib-storage \ + -DPKG_RUNDIR=\""$(rundir)"\" + +indexer_LDADD = $(LIBDOVECOT) $(MODULE_LIBS) +indexer_DEPENDENCIES = $(LIBDOVECOT_DEPS) +indexer_SOURCES = \ + indexer.c \ + indexer-client.c \ + indexer-queue.c \ + indexer-settings.c \ + worker-connection.c \ + worker-pool.c + +indexer_worker_LDADD = $(LIBDOVECOT_STORAGE) $(LIBDOVECOT) $(MODULE_LIBS) +indexer_worker_DEPENDENCIES = $(LIBDOVECOT_STORAGE) $(LIBDOVECOT_DEPS) +indexer_worker_SOURCES = \ + indexer-worker.c \ + indexer-worker-settings.c \ + master-connection.c + +noinst_HEADERS = \ + indexer.h \ + indexer-client.h \ + indexer-queue.h \ + master-connection.h \ + worker-connection.h \ + worker-pool.h +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/indexer-client.c Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,221 @@ +/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "llist.h" +#include "istream.h" +#include "ostream.h" +#include "strescape.h" +#include "master-service.h" +#include "indexer-queue.h" +#include "indexer-client.h" + +#include <stdlib.h> +#include <unistd.h> + +#define MAX_INBUF_SIZE (1024*64) + +#define INDEXER_CLIENT_PROTOCOL_MAJOR_VERSION 1 +#define INDEXER_CLIENT_PROTOCOL_MINOR_VERSION 0 + +struct indexer_client { + struct indexer_client *prev, *next; + + int refcount; + struct indexer_queue *queue; + + int fd; + struct istream *input; + struct ostream *output; + struct io *io; + + unsigned int version_received:1; + unsigned int handshaked:1; + unsigned int destroyed:1; +}; + +struct indexer_client_request { + struct indexer_client *client; + unsigned int tag; +}; + +struct indexer_client *clients = NULL; + +static void indexer_client_destroy(struct indexer_client *client); +static void indexer_client_ref(struct indexer_client *client); +static void indexer_client_unref(struct indexer_client *client); + +static const char *const* +indexer_client_next_line(struct indexer_client *client) +{ + const char *line; + char **args; + unsigned int i; + + line = i_stream_next_line(client->input); + if (line == NULL) + return NULL; + + args = p_strsplit(pool_datastack_create(), line, "\t"); + for (i = 0; args[i] != NULL; i++) + args[i] = str_tabunescape(args[i]); + return (void *)args; +} + +static int +indexer_client_request_queue(struct indexer_client *client, bool append, + const char *const *args, const char **error_r) +{ + struct indexer_client_request *ctx = NULL; + unsigned int tag; + + if (str_array_length(args) != 3) { + *error_r = "Wrong parameter count"; + return -1; + } + if (str_to_uint(args[0], &tag) < 0) { + *error_r = "Invalid tag"; + return -1; + } + + if (tag != 0) { + ctx = i_new(struct indexer_client_request, 1); + ctx->client = client; + ctx->tag = tag; + indexer_client_ref(client); + } + + indexer_queue_append(client->queue, append, args[1], args[2], ctx); + o_stream_send_str(client->output, "OK\n"); + return 0; +} + +static int +indexer_client_request(struct indexer_client *client, + const char *const *args, const char **error_r) +{ + const char *cmd = args[0]; + + args++; + + if (strcmp(cmd, "APPEND") == 0) + return indexer_client_request_queue(client, TRUE, args, error_r); + else if (strcmp(cmd, "PREPEND") == 0) + return indexer_client_request_queue(client, FALSE, args, error_r); + else { + *error_r = t_strconcat("Unknown command: ", cmd, NULL); + return -1; + } +} + +static void indexer_client_input(void *context) +{ + struct indexer_client *client = context; + const char *line, *const *args, *error; + + switch (i_stream_read(client->input)) { + case -2: + i_error("BUG: Client connection sent too much data"); + indexer_client_destroy(client); + return; + case -1: + indexer_client_destroy(client); + return; + } + + if (!client->version_received) { + if ((line = i_stream_next_line(client->input)) == NULL) + return; + + if (!version_string_verify(line, "indexer", + INDEXER_CLIENT_PROTOCOL_MAJOR_VERSION)) { + i_error("Client not compatible with this server " + "(mixed old and new binaries?)"); + indexer_client_destroy(client); + return; + } + client->version_received = TRUE; + } + + while ((args = indexer_client_next_line(client)) != NULL) { + if (args[0] != NULL) { + if (indexer_client_request(client, args, &error) < 0) { + i_error("Client input error: %s", error); + indexer_client_destroy(client); + break; + } + } + } +} + +void indexer_client_status_callback(int percentage, void *context) +{ + struct indexer_client_request *ctx = context; + + T_BEGIN { + o_stream_send_str(ctx->client->output, + t_strdup_printf("%u\t%d\n", ctx->tag, percentage)); + } T_END; + if (percentage < 0 || percentage == 100) { + indexer_client_unref(ctx->client); + i_free(ctx); + } +} + +struct indexer_client * +indexer_client_create(int fd, struct indexer_queue *queue) +{ + struct indexer_client *client; + + client = i_new(struct indexer_client, 1); + client->refcount = 1; + client->queue = queue; + client->fd = fd; + client->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE); + client->output = o_stream_create_fd(fd, (size_t)-1, FALSE); + client->io = io_add(fd, IO_READ, indexer_client_input, client); + DLLIST_PREPEND(&clients, client); + return client; +} + +static void indexer_client_destroy(struct indexer_client *client) +{ + if (client->destroyed) + return; + client->destroyed = TRUE; + + DLLIST_REMOVE(&clients, client); + + io_remove(&client->io); + i_stream_close(client->input); + o_stream_close(client->output); + if (close(client->fd) < 0) + i_error("close(client) failed: %m"); + client->fd = -1; + indexer_client_unref(client); + + master_service_client_connection_destroyed(master_service); +} + +static void indexer_client_ref(struct indexer_client *client) +{ + i_assert(client->refcount > 0); + + client->refcount++; +} + +static void indexer_client_unref(struct indexer_client *client) +{ + i_assert(client->refcount > 0); + + if (--client->refcount > 0) + return; + i_stream_destroy(&client->input); + o_stream_destroy(&client->output); + i_free(client); +} + +void indexer_clients_destroy_all(void) +{ + while (clients != NULL) + indexer_client_destroy(clients); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/indexer-client.h Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,11 @@ +#ifndef INDEXER_CLIENT_H +#define INDEXER_CLIENT_H + +struct indexer_queue; + +struct indexer_client * +indexer_client_create(int fd, struct indexer_queue *queue); +void indexer_client_status_callback(int percentage, void *context); +void indexer_clients_destroy_all(void); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/indexer-queue.c Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,190 @@ +/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "llist.h" +#include "hash.h" +#include "indexer-queue.h" + +struct indexer_queue { + indexer_status_callback_t *callback; + void (*listen_callback)(struct indexer_queue *); + + /* username+mailbox -> indexer_request */ + struct hash_table *requests; + struct indexer_request *head, *tail; +}; + +static unsigned int indexer_request_hash(const void *p) +{ + const struct indexer_request *request = p; + + return str_hash(request->username) ^ str_hash(request->mailbox); +} + +static int indexer_request_cmp(const void *p1, const void *p2) +{ + const struct indexer_request *r1 = p1, *r2 = p2; + + return strcmp(r1->username, r2->username) == 0 && + strcmp(r1->mailbox, r2->mailbox) == 0 ? 0 : 1; +} + +struct indexer_queue * +indexer_queue_init(indexer_status_callback_t *callback) +{ + struct indexer_queue *queue; + + queue = i_new(struct indexer_queue, 1); + queue->callback = callback; + queue->requests = hash_table_create(default_pool, default_pool, 0, + indexer_request_hash, + indexer_request_cmp); + return queue; +} + +void indexer_queue_deinit(struct indexer_queue **_queue) +{ + struct indexer_queue *queue = *_queue; + + *_queue = NULL; + + i_assert(indexer_queue_is_empty(queue)); + + hash_table_destroy(&queue->requests); + i_free(queue); +} + +void indexer_queue_set_listen_callback(struct indexer_queue *queue, + void (*callback)(struct indexer_queue *)) +{ + queue->listen_callback = callback; +} + +static struct indexer_request * +indexer_queue_lookup(struct indexer_queue *queue, + const char *username, const char *mailbox) +{ + struct indexer_request lookup_request; + + lookup_request.username = (void *)username; + lookup_request.mailbox = (void *)mailbox; + return hash_table_lookup(queue->requests, &lookup_request); +} + +static void request_add_context(struct indexer_request *request, void *context) +{ + unsigned int count = 0; + + if (context == NULL) + return; + + if (request->contexts == NULL) { + request->contexts = i_new(void *, 2); + } else { + for (; request->contexts[count] != NULL; count++) ; + + request->contexts = + i_realloc(request->contexts, + sizeof(*request->contexts) * (count + 1), + sizeof(*request->contexts) * (count + 2)); + } + request->contexts[count] = context; +} + +void indexer_queue_append(struct indexer_queue *queue, bool append, + const char *username, const char *mailbox, + void *context) +{ + struct indexer_request *request; + + request = indexer_queue_lookup(queue, username, mailbox); + if (request == NULL) { + request = i_new(struct indexer_request, 1); + request->username = i_strdup(username); + request->mailbox = i_strdup(mailbox); + request_add_context(request, context); + hash_table_insert(queue->requests, request, request); + } else { + request_add_context(request, context); + if (append) { + /* keep the request in its old position */ + return; + } + /* move request to beginning of the queue */ + DLLIST2_REMOVE(&queue->head, &queue->tail, request); + } + + if (append) + DLLIST2_APPEND(&queue->head, &queue->tail, request); + else + DLLIST2_PREPEND(&queue->head, &queue->tail, request); + + if (queue->listen_callback != NULL) + queue->listen_callback(queue); +} + +struct indexer_request *indexer_queue_request_peek(struct indexer_queue *queue) +{ + return queue->head; +} + +void indexer_queue_request_remove(struct indexer_queue *queue) +{ + struct indexer_request *request = queue->head; + + i_assert(request != NULL); + + DLLIST2_REMOVE(&queue->head, &queue->tail, request); + hash_table_remove(queue->requests, request); +} + +static void indexer_queue_request_status_int(struct indexer_queue *queue, + struct indexer_request *request, + int percentage) +{ + unsigned int i; + + if (request->contexts != NULL) { + for (i = 0; request->contexts[i] != NULL; i++) + queue->callback(percentage, request->contexts[i]); + i_free(request->contexts); + } +} + +void indexer_queue_request_status(struct indexer_queue *queue, + struct indexer_request *request, + int percentage) +{ + i_assert(percentage >= 0 && percentage < 100); + + indexer_queue_request_status_int(queue, request, percentage); +} + +void indexer_queue_request_finish(struct indexer_queue *queue, + struct indexer_request **_request, + bool success) +{ + struct indexer_request *request = *_request; + + *_request = NULL; + + indexer_queue_request_status_int(queue, request, success ? 100 : -1); + i_free(request->username); + i_free(request->mailbox); + i_free(request); +} + +void indexer_queue_cancel_all(struct indexer_queue *queue) +{ + struct indexer_request *request; + + while ((request = indexer_queue_request_peek(queue)) != NULL) { + indexer_queue_request_remove(queue); + indexer_queue_request_finish(queue, &request, -1); + } +} + +bool indexer_queue_is_empty(struct indexer_queue *queue) +{ + return queue->head == NULL; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/indexer-queue.h Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,42 @@ +#ifndef INDEXER_QUEUE_H +#define INDEXER_QUEUE_H + +#include "indexer.h" + +struct indexer_request { + struct indexer_request *prev, *next; + + char *username; + char *mailbox; + void **contexts; +}; + +struct indexer_queue *indexer_queue_init(indexer_status_callback_t *callback); +void indexer_queue_deinit(struct indexer_queue **queue); + +/* The callback is called whenever a new request is added to the queue. */ +void indexer_queue_set_listen_callback(struct indexer_queue *queue, + void (*callback)(struct indexer_queue *)); + +void indexer_queue_append(struct indexer_queue *queue, bool append, + const char *username, const char *mailbox, + void *context); +void indexer_queue_cancel_all(struct indexer_queue *queue); + +bool indexer_queue_is_empty(struct indexer_queue *queue); + +/* Return the next request from the queue, without removing it. */ +struct indexer_request *indexer_queue_request_peek(struct indexer_queue *queue); +/* Remove the next request from the queue. You must call + indexer_queue_request_finish() to free its memory. */ +void indexer_queue_request_remove(struct indexer_queue *queue); +/* Give a status update about how far the indexing is going on. */ +void indexer_queue_request_status(struct indexer_queue *queue, + struct indexer_request *request, + int percentage); +/* Finish the request and free its memory. */ +void indexer_queue_request_finish(struct indexer_queue *queue, + struct indexer_request **request, + bool success); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/indexer-settings.c Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,48 @@ +/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "buffer.h" +#include "settings-parser.h" +#include "service-settings.h" + +#include <stddef.h> + +extern const struct setting_parser_info service_setting_parser_info; + +/* <settings checks> */ +static struct file_listener_settings indexer_unix_listeners_array[] = { + { "indexer", 0666, "", "" } +}; +static struct file_listener_settings *indexer_unix_listeners[] = { + &indexer_unix_listeners_array[0] +}; +static buffer_t indexer_unix_listeners_buf = { + indexer_unix_listeners, sizeof(indexer_unix_listeners), { 0, } +}; +/* </settings checks> */ + +struct service_settings indexer_service_settings = { + .name = "indexer", + .protocol = "", + .type = "", + .executable = "indexer", + .user = "$default_internal_user", + .group = "", + .privileged_group = "", + .extra_groups = "", + .chroot = "", + + .drop_priv_before_exec = FALSE, + + .process_min_avail = 0, + .process_limit = 1, + .client_limit = 0, + .service_count = 0, + .idle_kill = 0, + .vsz_limit = (uoff_t)-1, + + .unix_listeners = { { &indexer_unix_listeners_buf, + sizeof(indexer_unix_listeners[0]) } }, + .fifo_listeners = ARRAY_INIT, + .inet_listeners = ARRAY_INIT +};
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/indexer-worker-settings.c Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,46 @@ +/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "buffer.h" +#include "settings-parser.h" +#include "service-settings.h" + +#include <stddef.h> + +/* <settings checks> */ +static struct file_listener_settings indexer_worker_unix_listeners_array[] = { + { "indexer-worker", 0666, "", "" } +}; +static struct file_listener_settings *indexer_worker_unix_listeners[] = { + &indexer_worker_unix_listeners_array[0] +}; +static buffer_t indexer_worker_unix_listeners_buf = { + indexer_worker_unix_listeners, sizeof(indexer_worker_unix_listeners), { 0, } +}; +/* </settings checks> */ + +struct service_settings indexer_worker_service_settings = { + .name = "indexer-worker", + .protocol = "", + .type = "", + .executable = "indexer-worker", + .user = "", + .group = "", + .privileged_group = "", + .extra_groups = "", + .chroot = "", + + .drop_priv_before_exec = FALSE, + + .process_min_avail = 0, + .process_limit = 10, + .client_limit = 1, + .service_count = 0, + .idle_kill = 0, + .vsz_limit = (uoff_t)-1, + + .unix_listeners = { { &indexer_worker_unix_listeners_buf, + sizeof(indexer_worker_unix_listeners[0]) } }, + .fifo_listeners = ARRAY_INIT, + .inet_listeners = ARRAY_INIT +};
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/indexer-worker.c Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,74 @@ +/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "restrict-access.h" +#include "mail-storage-service.h" +#include "mail-storage-settings.h" +#include "master-service.h" +#include "master-service-settings.h" +#include "master-connection.h" + +static struct master_connection *master_conn; +static struct mail_storage_service_ctx *storage_service; + +static void client_connected(struct master_service_connection *conn) +{ + if (master_conn != NULL) { + i_error("indexer-worker must be configured with client_limit=1"); + return; + } + + master_service_client_connection_accept(conn); + master_conn = master_connection_create(conn->fd, storage_service); +} + +static void drop_privileges(void) +{ + struct restrict_access_settings set; + const char *error; + + /* by default we don't drop any privileges, but keep running as root. */ + restrict_access_get_env(&set); + if (set.uid != 0) { + /* open config connection before dropping privileges */ + struct master_service_settings_input input; + struct master_service_settings_output output; + + memset(&input, 0, sizeof(input)); + input.module = "mail"; + input.service = "indexer-worker"; + (void)master_service_settings_read(master_service, + &input, &output, &error); + } + restrict_access_by_env(NULL, FALSE); +} + +int main(int argc, char *argv[]) +{ + enum mail_storage_service_flags storage_service_flags = + MAIL_STORAGE_SERVICE_FLAG_DISALLOW_ROOT | + MAIL_STORAGE_SERVICE_FLAG_USERDB_LOOKUP | + MAIL_STORAGE_SERVICE_FLAG_TEMP_PRIV_DROP | + MAIL_STORAGE_SERVICE_FLAG_NO_IDLE_TIMEOUT; + + master_service = master_service_init("indexer-worker", 0, + &argc, &argv, NULL); + if (master_getopt(master_service) > 0) + return FATAL_DEFAULT; + + drop_privileges(); + master_service_init_log(master_service, "indexer-worker: "); + master_service_init_finish(master_service); + + storage_service = mail_storage_service_init(master_service, NULL, + storage_service_flags); + restrict_access_allow_coredumps(TRUE); + + master_service_run(master_service, client_connected); + + if (master_conn != NULL) + master_connection_destroy(&master_conn); + mail_storage_service_deinit(&storage_service); + master_service_deinit(&master_service); + return 0; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/indexer.c Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,131 @@ +/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "restrict-access.h" +#include "master-service.h" +#include "indexer-client.h" +#include "indexer-queue.h" +#include "worker-pool.h" +#include "worker-connection.h" + +struct worker_request { + struct worker_connection *conn; + struct indexer_request *request; +}; + +static struct indexer_queue *queue; +static struct worker_pool *worker_pool; + +static bool idle_die(void) +{ + return indexer_queue_is_empty(queue); +} + +static void client_connected(struct master_service_connection *conn) +{ + master_service_client_connection_accept(conn); + (void)indexer_client_create(conn->fd, queue); +} + +static void worker_send_request(struct worker_connection *conn, + struct indexer_request *request) +{ + struct worker_request *wrequest; + + wrequest = i_new(struct worker_request, 1); + wrequest->conn = conn; + wrequest->request = request; + + worker_connection_request(conn, request->username, + request->mailbox, wrequest); +} + +static void queue_handle_existing_user_requests(struct indexer_queue *queue) +{ + struct worker_connection *conn; + struct indexer_request *request; + + while ((request = indexer_queue_request_peek(queue)) != NULL) { + conn = worker_pool_find_username_connection(worker_pool, + request->username); + if (conn == NULL) + break; + + indexer_queue_request_remove(queue); + /* there is already a worker handling this user. + it must be the one doing the indexing. */ + worker_send_request(conn, request); + } +} + +static void queue_try_send_more(struct indexer_queue *queue) +{ + struct worker_connection *conn; + struct indexer_request *request; + + queue_handle_existing_user_requests(queue); + + request = indexer_queue_request_peek(queue); + if (request == NULL) + return; + + /* okay, we have a request for a new user. */ + if (!worker_pool_get_connection(worker_pool, &conn)) + return; + + indexer_queue_request_remove(queue); + worker_send_request(conn, request); +} + +static void queue_listen_callback(struct indexer_queue *queue) +{ + queue_try_send_more(queue); +} + +static void worker_status_callback(int percentage, void *context) +{ + struct worker_request *request = context; + + if (percentage >= 0 && percentage < 100) { + indexer_queue_request_status(queue, request->request, + percentage); + return; + } + + indexer_queue_request_finish(queue, &request->request, + percentage == 100); + worker_pool_release_connection(worker_pool, request->conn); + i_free(request); + + /* if this was the last request for the connection, we can send more + through it */ + queue_try_send_more(queue); +} + +int main(int argc, char *argv[]) +{ + master_service = master_service_init("indexer", 0, &argc, &argv, NULL); + if (master_getopt(master_service) > 0) + return FATAL_DEFAULT; + + master_service_init_log(master_service, "indexer: "); + restrict_access_by_env(NULL, FALSE); + restrict_access_allow_coredumps(TRUE); + master_service_set_idle_die_callback(master_service, idle_die); + + master_service_init_finish(master_service); + worker_pool = worker_pool_init("indexer-worker", + worker_status_callback); + queue = indexer_queue_init(indexer_client_status_callback); + indexer_queue_set_listen_callback(queue, queue_listen_callback); + + master_service_run(master_service, client_connected); + + indexer_queue_cancel_all(queue); + indexer_clients_destroy_all(); + indexer_queue_deinit(&queue); + worker_pool_deinit(&worker_pool); + + master_service_deinit(&master_service); + return 0; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/indexer.h Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,7 @@ +#ifndef INDEXER_H +#define INDEXER_H + +/* percentage: -1 = failed, 0..99 = indexing in progress, 100 = done */ +typedef void indexer_status_callback_t(int percentage, void *context); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/master-connection.c Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,169 @@ +/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "istream.h" +#include "write-full.h" +#include "strescape.h" +#include "master-service.h" +#include "mail-namespace.h" +#include "mail-storage.h" +#include "mail-storage-service.h" +#include "master-connection.h" + +#include <unistd.h> + +#define INDEXER_PROTOCOL_MAJOR_VERSION 1 +#define INDEXER_PROTOCOL_MINOR_VERSION 0 + +#define INDEXER_WORKER_HANDSHAKE "VERSION\tindexer-worker-master\t1\t0\n%u\n" +#define INDEXER_MASTER_NAME "indexer-master-worker" + +struct master_connection { + struct mail_storage_service_ctx *storage_service; + + int fd; + struct io *io; + struct istream *input; + struct ostream *output; + + unsigned int version_received:1; +}; + +static int index_mailbox(struct mail_user *user, const char *mailbox) +{ + struct mail_namespace *ns; + struct mailbox *box; + const char *errstr; + enum mail_error error; + int ret = 0; + + ns = mail_namespace_find(user->namespaces, mailbox); + if (ns == NULL) { + i_error("Namespace not found for mailbox %s: ", mailbox); + return -1; + } + + /* FIXME: the current lib-storage API doesn't allow sending + "n% competed" notifications */ + box = mailbox_alloc(ns->list, mailbox, MAILBOX_FLAG_KEEP_RECENT); + if (mailbox_sync(box, MAILBOX_SYNC_FLAG_FULL_READ | + MAILBOX_SYNC_FLAG_PRECACHE) < 0) { + errstr = mail_storage_get_last_error(mailbox_get_storage(box), + &error); + if (error != MAIL_ERROR_NOTFOUND) { + i_error("Syncing mailbox %s failed: %s", + mailbox, errstr); + } else if (user->mail_debug) { + i_debug("Syncing mailbox %s failed: %s", + mailbox, errstr); + } + ret = -1; + } + mailbox_free(&box); + return ret; +} + +static int +master_connection_input_line(struct master_connection *conn, const char *line) +{ + const char *const *args = t_strsplit_tabescaped(line); + struct mail_storage_service_input input; + struct mail_storage_service_user *service_user; + struct mail_user *user; + const char *str, *error; + int ret; + + /* <username> <mailbox> */ + if (str_array_length(args) != 2) { + i_error("Invalid input from master: %s", line); + return -1; + } + + memset(&input, 0, sizeof(input)); + input.module = "mail"; + input.service = "indexer-worker"; + input.username = args[0]; + + if (mail_storage_service_lookup_next(conn->storage_service, &input, + &service_user, &user, &error) <= 0) { + i_error("User %s lookup failed: %s", args[0], error); + ret = -1; + } else { + ret = index_mailbox(user, args[1]); + mail_user_unref(&user); + mail_storage_service_user_free(&service_user); + } + + str = ret < 0 ? "-1\n" : "100\n"; + return write_full(conn->fd, str, strlen(str)); +} + +static void master_connection_input(struct master_connection *conn) +{ + const char *line; + int ret; + + if (i_stream_read(conn->input) < 0) { + master_service_stop(master_service); + return; + } + + if (!conn->version_received) { + if ((line = i_stream_next_line(conn->input)) == NULL) + return; + + if (!version_string_verify(line, INDEXER_MASTER_NAME, + INDEXER_PROTOCOL_MAJOR_VERSION)) { + i_error("Indexer master not compatible with this master " + "(mixed old and new binaries?)"); + master_service_stop(master_service); + return; + } + conn->version_received = TRUE; + } + + while ((line = i_stream_next_line(conn->input)) != NULL) { + T_BEGIN { + ret = master_connection_input_line(conn, line); + } T_END; + if (ret < 0) { + master_service_stop(master_service); + break; + } + } +} + +struct master_connection * +master_connection_create(int fd, struct mail_storage_service_ctx *storage_service) +{ + struct master_connection *conn; + const char *handshake; + + conn = i_new(struct master_connection, 1); + conn->storage_service = storage_service; + conn->fd = fd; + conn->io = io_add(conn->fd, IO_READ, master_connection_input, conn); + conn->input = i_stream_create_fd(conn->fd, (size_t)-1, FALSE); + + handshake = t_strdup_printf(INDEXER_WORKER_HANDSHAKE, + master_service_get_process_limit(master_service)); + (void)write_full(conn->fd, handshake, strlen(handshake)); + return conn; +} + +void master_connection_destroy(struct master_connection **_conn) +{ + struct master_connection *conn = *_conn; + + *_conn = NULL; + + io_remove(&conn->io); + i_stream_destroy(&conn->input); + + if (close(conn->fd) < 0) + i_error("close(master conn) failed: %m"); + i_free(conn); + + master_service_client_connection_destroyed(master_service); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/master-connection.h Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,8 @@ +#ifndef MASTER_CONNECTION_H +#define MASTER_CONNECTION_H + +struct master_connection * +master_connection_create(int fd, struct mail_storage_service_ctx *storage_service); +void master_connection_destroy(struct master_connection **conn); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/worker-connection.c Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,227 @@ +/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "aqueue.h" +#include "ioloop.h" +#include "istream.h" +#include "ostream.h" +#include "str.h" +#include "strescape.h" +#include "master-service.h" +#include "worker-connection.h" + +#include <unistd.h> + +#define INDEXER_PROTOCOL_MAJOR_VERSION 1 +#define INDEXER_PROTOCOL_MINOR_VERSION 0 + +#define INDEXER_MASTER_HANDSHAKE "VERSION\tindexer-master-worker\t1\t0\n" +#define INDEXER_WORKER_NAME "indexer-worker-master" + +struct worker_connection { + char *socket_path; + indexer_status_callback_t *callback; + + int fd; + struct io *io; + struct istream *input; + struct ostream *output; + + char *request_username; + ARRAY_DEFINE(request_contexts, void *); + struct aqueue *request_queue; + + unsigned int process_limit; + unsigned int version_received:1; +}; + +struct worker_connection * +worker_connection_create(const char *socket_path, + indexer_status_callback_t *callback) +{ + struct worker_connection *conn; + + conn = i_new(struct worker_connection, 1); + conn->socket_path = i_strdup(socket_path); + conn->callback = callback; + conn->fd = -1; + i_array_init(&conn->request_contexts, 32); + conn->request_queue = aqueue_init(&conn->request_contexts.arr); + return conn; +} + +static void worker_connection_disconnect(struct worker_connection *conn) +{ + unsigned int i, count = aqueue_count(conn->request_queue); + + if (conn->fd != -1) { + io_remove(&conn->io); + i_stream_destroy(&conn->input); + o_stream_destroy(&conn->output); + + if (close(conn->fd) < 0) + i_error("close(%s) failed: %m", conn->socket_path); + conn->fd = -1; + } + + /* cancel any pending requests */ + for (i = 0; i < count; i++) { + void *const *contextp = + array_idx(&conn->request_contexts, + aqueue_idx(conn->request_queue, i)); + conn->callback(-1, *contextp); + } + aqueue_clear(conn->request_queue); +} + +void worker_connection_destroy(struct worker_connection **_conn) +{ + struct worker_connection *conn = *_conn; + + *_conn = NULL; + + worker_connection_disconnect(conn); + + aqueue_deinit(&conn->request_queue); + array_free(&conn->request_contexts); + i_free(conn->socket_path); + i_free(conn); +} + +static int +worker_connection_input_line(struct worker_connection *conn, const char *line) +{ + void *const *contextp, *context; + int percentage; + + if (aqueue_count(conn->request_queue) == 0) { + i_error("Input from worker without pending requests: %s", line); + return -1; + } + + if (str_to_int(line, &percentage) < 0 || + percentage < -1 || percentage > 100) { + i_error("Invalid input from worker: %s", line); + return -1; + } + + contextp = array_idx(&conn->request_contexts, + aqueue_idx(conn->request_queue, 0)); + context = *contextp; + aqueue_delete_tail(conn->request_queue); + + if (aqueue_count(conn->request_queue) == 0) + i_free_and_null(conn->request_username); + + conn->callback(percentage, context); + return 0; +} + +static void worker_connection_input(struct worker_connection *conn) +{ + const char *line; + + if (i_stream_read(conn->input) < 0) { + worker_connection_disconnect(conn); + return; + } + + if (!conn->version_received) { + if ((line = i_stream_next_line(conn->input)) == NULL) + return; + + if (!version_string_verify(line, INDEXER_WORKER_NAME, + INDEXER_PROTOCOL_MAJOR_VERSION)) { + i_error("Indexer worker not compatible with this master " + "(mixed old and new binaries?)"); + worker_connection_disconnect(conn); + return; + } + conn->version_received = TRUE; + } + if (conn->process_limit == 0) { + if ((line = i_stream_next_line(conn->input)) == NULL) + return; + if (str_to_uint(line, &conn->process_limit) < 0 || + conn->process_limit == 0) { + i_error("Indexer worker sent invalid handshake: %s", + line); + worker_connection_disconnect(conn); + return; + } + } + + while ((line = i_stream_next_line(conn->input)) != NULL) { + if (worker_connection_input_line(conn, line) < 0) { + worker_connection_disconnect(conn); + break; + } + } +} + +int worker_connection_connect(struct worker_connection *conn) +{ + i_assert(conn->fd == -1); + + conn->fd = net_connect_unix(conn->socket_path); + if (conn->fd == -1) { + i_error("connect(%s) failed: %m", conn->socket_path); + return -1; + } + conn->io = io_add(conn->fd, IO_READ, worker_connection_input, conn); + conn->input = i_stream_create_fd(conn->fd, (size_t)-1, FALSE); + conn->output = o_stream_create_fd(conn->fd, (size_t)-1, FALSE); + o_stream_send_str(conn->output, INDEXER_MASTER_HANDSHAKE); + return 0; +} + +bool worker_connection_is_connected(struct worker_connection *conn) +{ + return conn->fd != -1; +} + +bool worker_connection_get_process_limit(struct worker_connection *conn, + unsigned int *limit_r) +{ + if (conn->process_limit == 0) + return FALSE; + + *limit_r = conn->process_limit; + return TRUE; +} + +void worker_connection_request(struct worker_connection *conn, + const char *username, const char *mailbox, + void *context) +{ + i_assert(worker_connection_is_connected(conn)); + i_assert(context != NULL); + + if (conn->request_username == NULL) + conn->request_username = i_strdup(username); + else + i_assert(strcmp(conn->request_username, username) == 0); + + aqueue_append(conn->request_queue, &context); + + T_BEGIN { + string_t *str = t_str_new(128); + + str_tabescape_write(str, username); + str_append_c(str, '\t'); + str_tabescape_write(str, mailbox); + str_append_c(str, '\n'); + o_stream_send(conn->output, str_data(str), str_len(str)); + } T_END; +} + +bool worker_connection_is_busy(struct worker_connection *conn) +{ + return aqueue_count(conn->request_queue) > 0; +} + +const char *worker_connection_get_username(struct worker_connection *conn) +{ + return conn->request_username; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/worker-connection.h Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,33 @@ +#ifndef WORKER_CONNECTION_H +#define WORKER_CONNECTION_H + +#include "indexer.h" + +struct worker_connection * +worker_connection_create(const char *socket_path, + indexer_status_callback_t *callback); +void worker_connection_destroy(struct worker_connection **conn); + +int worker_connection_connect(struct worker_connection *conn); +/* Returns TRUE if worker is connected to (not necessarily handshaked yet) */ +bool worker_connection_is_connected(struct worker_connection *conn); + +/* After initial handshake the worker process tells how many of its kind + can be at maximum. This returns the value, of FALSE if handshake isn't + finished yet. */ +bool worker_connection_get_process_limit(struct worker_connection *conn, + unsigned int *limit_r); + +/* Send a new indexing request for username+mailbox. The status callback is + called as necessary with the given context. Requests can be queued, but + only for the same username. */ +void worker_connection_request(struct worker_connection *conn, + const char *username, const char *mailbox, + void *context); +/* Returns TRUE if a request is being handled. */ +bool worker_connection_is_busy(struct worker_connection *conn); +/* Returns username of the currently pending requests, + or NULL if there are none. */ +const char *worker_connection_get_username(struct worker_connection *conn); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/worker-pool.c Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,196 @@ +/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "llist.h" +#include "worker-connection.h" +#include "worker-pool.h" + +#define MAX_WORKER_IDLE_SECS (60*5) + +struct worker_connection_list { + struct worker_connection_list *prev, *next; + + struct worker_connection *conn; + time_t last_use; +}; + +struct worker_pool { + char *socket_path; + indexer_status_callback_t *callback; + + unsigned int connection_count; + struct worker_connection_list *busy_list, *idle_list; +}; + +static void +worker_connection_idle_list_free(struct worker_pool *pool, + struct worker_connection_list *list); + +struct worker_pool * +worker_pool_init(const char *socket_path, indexer_status_callback_t *callback) +{ + struct worker_pool *pool; + + pool = i_new(struct worker_pool, 1); + pool->socket_path = i_strdup(socket_path); + pool->callback = callback; + return pool; +} + +void worker_pool_deinit(struct worker_pool **_pool) +{ + struct worker_pool *pool = *_pool; + + *_pool = NULL; + + while (pool->busy_list != NULL) { + struct worker_connection_list *list = pool->busy_list; + + DLLIST_REMOVE(&pool->busy_list, list); + worker_connection_idle_list_free(pool, list); + } + + while (pool->idle_list != NULL) { + struct worker_connection_list *list = pool->idle_list; + + worker_connection_idle_list_free(pool, list); + } + + i_free(pool->socket_path); + i_free(pool); +} + +static int worker_pool_add_connection(struct worker_pool *pool) +{ + struct worker_connection *conn; + struct worker_connection_list *list; + + conn = worker_connection_create(pool->socket_path, pool->callback); + if (worker_connection_connect(conn) < 0) { + worker_connection_destroy(&conn); + return -1; + } + + i_assert(pool->idle_list == NULL); + + list = i_new(struct worker_connection_list, 1); + list->conn = conn; + list->last_use = ioloop_time; + pool->idle_list = list; + pool->connection_count++; + return 0; +} + +static void +worker_connection_idle_list_free(struct worker_pool *pool, + struct worker_connection_list *list) +{ + DLLIST_REMOVE(&pool->idle_list, list); + worker_connection_destroy(&list->conn); + i_free(list); + + i_assert(pool->connection_count > 0); + pool->connection_count--; +} + +static unsigned int worker_pool_find_max_connections(struct worker_pool *pool) +{ + struct worker_connection_list *list; + unsigned int limit; + + i_assert(pool->idle_list == NULL); + + if (pool->busy_list == NULL) + return 1; + + for (list = pool->busy_list; list != NULL; list = list->next) { + if (worker_connection_get_process_limit(list->conn, &limit)) + return limit; + } + /* we have at least one connection that has already been created, + but without having handshaked yet. wait until it's finished. */ + return 0; +} + +bool worker_pool_get_connection(struct worker_pool *pool, + struct worker_connection **conn_r) +{ + struct worker_connection_list *list; + unsigned int max_connections; + + while (pool->idle_list != NULL && + !worker_connection_is_connected(pool->idle_list->conn)) + worker_connection_idle_list_free(pool, pool->idle_list); + + if (pool->idle_list == NULL) { + max_connections = worker_pool_find_max_connections(pool); + if (pool->connection_count >= max_connections) + return FALSE; + if (worker_pool_add_connection(pool) < 0) + return FALSE; + i_assert(pool->idle_list != NULL); + } + list = pool->idle_list; + DLLIST_REMOVE(&pool->idle_list, list); + DLLIST_PREPEND(&pool->busy_list, list); + + *conn_r = list->conn; + return TRUE; +} + +static void worker_pool_kill_idle_connections(struct worker_pool *pool) +{ + struct worker_connection_list *list, *next; + time_t kill_timestamp; + + kill_timestamp = ioloop_time - MAX_WORKER_IDLE_SECS; + for (list = pool->idle_list; list != NULL; list = next) { + next = list->next; + if (list->last_use < kill_timestamp) + worker_connection_idle_list_free(pool, list); + } +} + +void worker_pool_release_connection(struct worker_pool *pool, + struct worker_connection *conn) +{ + struct worker_connection_list *list; + + if (worker_connection_is_busy(conn)) { + /* not finished with all queued requests yet */ + return; + } + + for (list = pool->busy_list; list != NULL; list = list->next) { + if (list->conn == conn) + break; + } + i_assert(list != NULL); + + DLLIST_REMOVE(&pool->busy_list, list); + + if (!worker_connection_is_connected(conn)) + worker_connection_idle_list_free(pool, list); + else { + DLLIST_PREPEND(&pool->idle_list, list); + list->last_use = ioloop_time; + + worker_pool_kill_idle_connections(pool); + } +} + +struct worker_connection * +worker_pool_find_username_connection(struct worker_pool *pool, + const char *username) +{ + struct worker_connection_list *list; + const char *worker_user; + + for (list = pool->busy_list; list != NULL; list = list->next) { + worker_user = worker_connection_get_username(list->conn); + if (worker_user != NULL && strcmp(worker_user, username) == 0) + return list->conn; + } + return NULL; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/worker-pool.h Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,21 @@ +#ifndef WORKER_POOL_H +#define WORKER_POOL_H + +#include "indexer.h" + +struct worker_connection; + +struct worker_pool * +worker_pool_init(const char *socket_path, indexer_status_callback_t *callback); +void worker_pool_deinit(struct worker_pool **pool); + +bool worker_pool_get_connection(struct worker_pool *pool, + struct worker_connection **conn_r); +void worker_pool_release_connection(struct worker_pool *pool, + struct worker_connection *conn); + +struct worker_connection * +worker_pool_find_username_connection(struct worker_pool *pool, + const char *username); + +#endif