changeset 13084:0faaceb2f83c

Added "indexer" service, which uses worker processes to perform queued mailbox indexing. Only a single worker process will index the same user at the same time. This avoids lock waits, especially when doing full text search indexing with backends that require locking.
author Timo Sirainen <tss@iki.fi>
date Mon, 27 Jun 2011 23:02:40 +0300
parents e07d2e37053d
children 683751c6357e
files .hgignore configure.in src/Makefile.am src/indexer/Makefile.am src/indexer/indexer-client.c src/indexer/indexer-client.h src/indexer/indexer-queue.c src/indexer/indexer-queue.h src/indexer/indexer-settings.c src/indexer/indexer-worker-settings.c src/indexer/indexer-worker.c src/indexer/indexer.c src/indexer/indexer.h src/indexer/master-connection.c src/indexer/master-connection.h src/indexer/worker-connection.c src/indexer/worker-connection.h src/indexer/worker-pool.c src/indexer/worker-pool.h
diffstat 19 files changed, 1465 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/.hgignore	Mon Jun 27 22:58:49 2011 +0300
+++ b/.hgignore	Mon Jun 27 23:02:40 2011 +0300
@@ -69,6 +69,8 @@
 src/dsync/dsync
 src/imap-login/imap-login
 src/imap/imap
+src/indexer/indexer
+src/indexer/indexer-worker
 src/ipc/ipc
 src/lib/unicodemap.c
 src/lib/UnicodeData.txt
--- a/configure.in	Mon Jun 27 22:58:49 2011 +0300
+++ b/configure.in	Mon Jun 27 23:02:40 2011 +0300
@@ -2704,6 +2704,7 @@
 src/dict/Makefile
 src/director/Makefile
 src/dns/Makefile
+src/indexer/Makefile
 src/ipc/Makefile
 src/imap/Makefile
 src/imap-login/Makefile
--- a/src/Makefile.am	Mon Jun 27 22:58:49 2011 +0300
+++ b/src/Makefile.am	Mon Jun 27 23:02:40 2011 +0300
@@ -25,6 +25,7 @@
 	auth \
 	dict \
 	dns \
+	indexer \
 	ipc \
 	master \
 	login-common \
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/Makefile.am	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,37 @@
+pkglibexecdir = $(libexecdir)/dovecot
+
+pkglibexec_PROGRAMS = indexer indexer-worker
+
+AM_CPPFLAGS = \
+	-I$(top_srcdir)/src/lib \
+	-I$(top_srcdir)/src/lib-master \
+	-I$(top_srcdir)/src/lib-settings \
+	-I$(top_srcdir)/src/lib-mail \
+	-I$(top_srcdir)/src/lib-storage \
+	-DPKG_RUNDIR=\""$(rundir)"\"
+
+indexer_LDADD = $(LIBDOVECOT) $(MODULE_LIBS)
+indexer_DEPENDENCIES = $(LIBDOVECOT_DEPS)
+indexer_SOURCES = \
+	indexer.c \
+	indexer-client.c \
+	indexer-queue.c \
+	indexer-settings.c \
+	worker-connection.c \
+	worker-pool.c
+
+indexer_worker_LDADD = $(LIBDOVECOT_STORAGE) $(LIBDOVECOT) $(MODULE_LIBS)
+indexer_worker_DEPENDENCIES = $(LIBDOVECOT_STORAGE) $(LIBDOVECOT_DEPS)
+indexer_worker_SOURCES = \
+	indexer-worker.c \
+	indexer-worker-settings.c \
+	master-connection.c
+
+noinst_HEADERS = \
+	indexer.h \
+	indexer-client.h \
+	indexer-queue.h \
+	master-connection.h \
+	worker-connection.h \
+	worker-pool.h
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/indexer-client.c	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,221 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "llist.h"
+#include "istream.h"
+#include "ostream.h"
+#include "strescape.h"
+#include "master-service.h"
+#include "indexer-queue.h"
+#include "indexer-client.h"
+
+#include <stdlib.h>
+#include <unistd.h>
+
+#define MAX_INBUF_SIZE (1024*64)
+
+#define INDEXER_CLIENT_PROTOCOL_MAJOR_VERSION 1
+#define INDEXER_CLIENT_PROTOCOL_MINOR_VERSION 0
+
+struct indexer_client {
+	struct indexer_client *prev, *next;
+
+	int refcount;
+	struct indexer_queue *queue;
+
+	int fd;
+	struct istream *input;
+	struct ostream *output;
+	struct io *io;
+
+	unsigned int version_received:1;
+	unsigned int handshaked:1;
+	unsigned int destroyed:1;
+};
+
+struct indexer_client_request {
+	struct indexer_client *client;
+	unsigned int tag;
+};
+
+struct indexer_client *clients = NULL;
+
+static void indexer_client_destroy(struct indexer_client *client);
+static void indexer_client_ref(struct indexer_client *client);
+static void indexer_client_unref(struct indexer_client *client);
+
+static const char *const*
+indexer_client_next_line(struct indexer_client *client)
+{
+	const char *line;
+	char **args;
+	unsigned int i;
+
+	line = i_stream_next_line(client->input);
+	if (line == NULL)
+		return NULL;
+
+	args = p_strsplit(pool_datastack_create(), line, "\t");
+	for (i = 0; args[i] != NULL; i++)
+		args[i] = str_tabunescape(args[i]);
+	return (void *)args;
+}
+
+static int
+indexer_client_request_queue(struct indexer_client *client, bool append,
+			     const char *const *args, const char **error_r)
+{
+	struct indexer_client_request *ctx = NULL;
+	unsigned int tag;
+
+	if (str_array_length(args) != 3) {
+		*error_r = "Wrong parameter count";
+		return -1;
+	}
+	if (str_to_uint(args[0], &tag) < 0) {
+		*error_r = "Invalid tag";
+		return -1;
+	}
+
+	if (tag != 0) {
+		ctx = i_new(struct indexer_client_request, 1);
+		ctx->client = client;
+		ctx->tag = tag;
+		indexer_client_ref(client);
+	}
+
+	indexer_queue_append(client->queue, append, args[1], args[2], ctx);
+	o_stream_send_str(client->output, "OK\n");
+	return 0;
+}
+
+static int
+indexer_client_request(struct indexer_client *client,
+		       const char *const *args, const char **error_r)
+{
+	const char *cmd = args[0];
+
+	args++;
+
+	if (strcmp(cmd, "APPEND") == 0)
+		return indexer_client_request_queue(client, TRUE, args, error_r);
+	else if (strcmp(cmd, "PREPEND") == 0)
+		return indexer_client_request_queue(client, FALSE, args, error_r);
+	else {
+		*error_r = t_strconcat("Unknown command: ", cmd, NULL);
+		return -1;
+	}
+}
+
+static void indexer_client_input(void *context)
+{
+	struct indexer_client *client = context;
+	const char *line, *const *args, *error;
+
+	switch (i_stream_read(client->input)) {
+	case -2:
+		i_error("BUG: Client connection sent too much data");
+		indexer_client_destroy(client);
+		return;
+	case -1:
+		indexer_client_destroy(client);
+		return;
+	}
+
+	if (!client->version_received) {
+		if ((line = i_stream_next_line(client->input)) == NULL)
+			return;
+
+		if (!version_string_verify(line, "indexer",
+				INDEXER_CLIENT_PROTOCOL_MAJOR_VERSION)) {
+			i_error("Client not compatible with this server "
+				"(mixed old and new binaries?)");
+			indexer_client_destroy(client);
+			return;
+		}
+		client->version_received = TRUE;
+	}
+
+	while ((args = indexer_client_next_line(client)) != NULL) {
+		if (args[0] != NULL) {
+			if (indexer_client_request(client, args, &error) < 0) {
+				i_error("Client input error: %s", error);
+				indexer_client_destroy(client);
+				break;
+			}
+		}
+	}
+}
+
+void indexer_client_status_callback(int percentage, void *context)
+{
+	struct indexer_client_request *ctx = context;
+
+	T_BEGIN {
+		o_stream_send_str(ctx->client->output,
+			t_strdup_printf("%u\t%d\n", ctx->tag, percentage));
+	} T_END;
+	if (percentage < 0 || percentage == 100) {
+		indexer_client_unref(ctx->client);
+		i_free(ctx);
+	}
+}
+
+struct indexer_client *
+indexer_client_create(int fd, struct indexer_queue *queue)
+{
+	struct indexer_client *client;
+
+	client = i_new(struct indexer_client, 1);
+	client->refcount = 1;
+	client->queue = queue;
+	client->fd = fd;
+	client->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE);
+	client->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
+	client->io = io_add(fd, IO_READ, indexer_client_input, client);
+	DLLIST_PREPEND(&clients, client);
+	return client;
+}
+
+static void indexer_client_destroy(struct indexer_client *client)
+{
+	if (client->destroyed)
+		return;
+	client->destroyed = TRUE;
+
+	DLLIST_REMOVE(&clients, client);
+
+	io_remove(&client->io);
+	i_stream_close(client->input);
+	o_stream_close(client->output);
+	if (close(client->fd) < 0)
+		i_error("close(client) failed: %m");
+	client->fd = -1;
+	indexer_client_unref(client);
+
+	master_service_client_connection_destroyed(master_service);
+}
+
+static void indexer_client_ref(struct indexer_client *client)
+{
+	i_assert(client->refcount > 0);
+
+	client->refcount++;
+}
+
+static void indexer_client_unref(struct indexer_client *client)
+{
+	i_assert(client->refcount > 0);
+
+	if (--client->refcount > 0)
+		return;
+	i_stream_destroy(&client->input);
+	o_stream_destroy(&client->output);
+	i_free(client);
+}
+
+void indexer_clients_destroy_all(void)
+{
+	while (clients != NULL)
+		indexer_client_destroy(clients);
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/indexer-client.h	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,11 @@
+#ifndef INDEXER_CLIENT_H
+#define INDEXER_CLIENT_H
+
+struct indexer_queue;
+
+struct indexer_client *
+indexer_client_create(int fd, struct indexer_queue *queue);
+void indexer_client_status_callback(int percentage, void *context);
+void indexer_clients_destroy_all(void);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/indexer-queue.c	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,190 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "llist.h"
+#include "hash.h"
+#include "indexer-queue.h"
+
+struct indexer_queue {
+	indexer_status_callback_t *callback;
+	void (*listen_callback)(struct indexer_queue *);
+
+	/* username+mailbox -> indexer_request */
+	struct hash_table *requests;
+	struct indexer_request *head, *tail;
+};
+
+static unsigned int indexer_request_hash(const void *p)
+{
+	const struct indexer_request *request = p;
+
+	return str_hash(request->username) ^ str_hash(request->mailbox);
+}
+
+static int indexer_request_cmp(const void *p1, const void *p2)
+{
+	const struct indexer_request *r1 = p1, *r2 = p2;
+
+	return strcmp(r1->username, r2->username) == 0 &&
+		strcmp(r1->mailbox, r2->mailbox) == 0 ? 0 : 1;
+}
+
+struct indexer_queue *
+indexer_queue_init(indexer_status_callback_t *callback)
+{
+	struct indexer_queue *queue;
+	
+	queue = i_new(struct indexer_queue, 1);
+	queue->callback = callback;
+	queue->requests = hash_table_create(default_pool, default_pool, 0,
+					    indexer_request_hash,
+					    indexer_request_cmp);
+	return queue;
+}
+
+void indexer_queue_deinit(struct indexer_queue **_queue)
+{
+	struct indexer_queue *queue = *_queue;
+
+	*_queue = NULL;
+
+	i_assert(indexer_queue_is_empty(queue));
+
+	hash_table_destroy(&queue->requests);
+	i_free(queue);
+}
+
+void indexer_queue_set_listen_callback(struct indexer_queue *queue,
+				       void (*callback)(struct indexer_queue *))
+{
+	queue->listen_callback = callback;
+}
+
+static struct indexer_request *
+indexer_queue_lookup(struct indexer_queue *queue,
+		     const char *username, const char *mailbox)
+{
+	struct indexer_request lookup_request;
+
+	lookup_request.username = (void *)username;
+	lookup_request.mailbox = (void *)mailbox;
+	return hash_table_lookup(queue->requests, &lookup_request);
+}
+
+static void request_add_context(struct indexer_request *request, void *context)
+{
+	unsigned int count = 0;
+
+	if (context == NULL)
+		return;
+
+	if (request->contexts == NULL) {
+		request->contexts = i_new(void *, 2);
+	} else {
+		for (; request->contexts[count] != NULL; count++) ;
+
+		request->contexts =
+			i_realloc(request->contexts,
+				  sizeof(*request->contexts) * (count + 1),
+				  sizeof(*request->contexts) * (count + 2));
+	}
+	request->contexts[count] = context;
+}
+
+void indexer_queue_append(struct indexer_queue *queue, bool append,
+			  const char *username, const char *mailbox,
+			  void *context)
+{
+	struct indexer_request *request;
+
+	request = indexer_queue_lookup(queue, username, mailbox);
+	if (request == NULL) {
+		request = i_new(struct indexer_request, 1);
+		request->username = i_strdup(username);
+		request->mailbox = i_strdup(mailbox);
+		request_add_context(request, context);
+		hash_table_insert(queue->requests, request, request);
+	} else {
+		request_add_context(request, context);
+		if (append) {
+			/* keep the request in its old position */
+			return;
+		}
+		/* move request to beginning of the queue */
+		DLLIST2_REMOVE(&queue->head, &queue->tail, request);
+	}
+
+	if (append)
+		DLLIST2_APPEND(&queue->head, &queue->tail, request);
+	else
+		DLLIST2_PREPEND(&queue->head, &queue->tail, request);
+
+	if (queue->listen_callback != NULL)
+		queue->listen_callback(queue);
+}
+
+struct indexer_request *indexer_queue_request_peek(struct indexer_queue *queue)
+{
+	return queue->head;
+}
+
+void indexer_queue_request_remove(struct indexer_queue *queue)
+{
+	struct indexer_request *request = queue->head;
+
+	i_assert(request != NULL);
+
+	DLLIST2_REMOVE(&queue->head, &queue->tail, request);
+	hash_table_remove(queue->requests, request);
+}
+
+static void indexer_queue_request_status_int(struct indexer_queue *queue,
+					     struct indexer_request *request,
+					     int percentage)
+{
+	unsigned int i;
+
+	if (request->contexts != NULL) {
+		for (i = 0; request->contexts[i] != NULL; i++)
+			queue->callback(percentage, request->contexts[i]);
+		i_free(request->contexts);
+	}
+}
+
+void indexer_queue_request_status(struct indexer_queue *queue,
+				  struct indexer_request *request,
+				  int percentage)
+{
+	i_assert(percentage >= 0 && percentage < 100);
+
+	indexer_queue_request_status_int(queue, request, percentage);
+}
+
+void indexer_queue_request_finish(struct indexer_queue *queue,
+				  struct indexer_request **_request,
+				  bool success)
+{
+	struct indexer_request *request = *_request;
+
+	*_request = NULL;
+
+	indexer_queue_request_status_int(queue, request, success ? 100 : -1);
+	i_free(request->username);
+	i_free(request->mailbox);
+	i_free(request);
+}
+
+void indexer_queue_cancel_all(struct indexer_queue *queue)
+{
+	struct indexer_request *request;
+
+	while ((request = indexer_queue_request_peek(queue)) != NULL) {
+		indexer_queue_request_remove(queue);
+		indexer_queue_request_finish(queue, &request, -1);
+	}
+}
+
+bool indexer_queue_is_empty(struct indexer_queue *queue)
+{
+	return queue->head == NULL;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/indexer-queue.h	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,42 @@
+#ifndef INDEXER_QUEUE_H
+#define INDEXER_QUEUE_H
+
+#include "indexer.h"
+
+struct indexer_request {
+	struct indexer_request *prev, *next;
+
+	char *username;
+	char *mailbox;
+	void **contexts;
+};
+
+struct indexer_queue *indexer_queue_init(indexer_status_callback_t *callback);
+void indexer_queue_deinit(struct indexer_queue **queue);
+
+/* The callback is called whenever a new request is added to the queue. */
+void indexer_queue_set_listen_callback(struct indexer_queue *queue,
+				       void (*callback)(struct indexer_queue *));
+	
+void indexer_queue_append(struct indexer_queue *queue, bool append,
+			  const char *username, const char *mailbox,
+			  void *context);
+void indexer_queue_cancel_all(struct indexer_queue *queue);
+
+bool indexer_queue_is_empty(struct indexer_queue *queue);
+
+/* Return the next request from the queue, without removing it. */
+struct indexer_request *indexer_queue_request_peek(struct indexer_queue *queue);
+/* Remove the next request from the queue. You must call
+   indexer_queue_request_finish() to free its memory. */
+void indexer_queue_request_remove(struct indexer_queue *queue);
+/* Give a status update about how far the indexing is going on. */
+void indexer_queue_request_status(struct indexer_queue *queue,
+				  struct indexer_request *request,
+				  int percentage);
+/* Finish the request and free its memory. */
+void indexer_queue_request_finish(struct indexer_queue *queue,
+				  struct indexer_request **request,
+				  bool success);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/indexer-settings.c	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,48 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "buffer.h"
+#include "settings-parser.h"
+#include "service-settings.h"
+
+#include <stddef.h>
+
+extern const struct setting_parser_info service_setting_parser_info;
+
+/* <settings checks> */
+static struct file_listener_settings indexer_unix_listeners_array[] = {
+	{ "indexer", 0666, "", "" }
+};
+static struct file_listener_settings *indexer_unix_listeners[] = {
+	&indexer_unix_listeners_array[0]
+};
+static buffer_t indexer_unix_listeners_buf = {
+	indexer_unix_listeners, sizeof(indexer_unix_listeners), { 0, }
+};
+/* </settings checks> */
+
+struct service_settings indexer_service_settings = {
+	.name = "indexer",
+	.protocol = "",
+	.type = "",
+	.executable = "indexer",
+	.user = "$default_internal_user",
+	.group = "",
+	.privileged_group = "",
+	.extra_groups = "",
+	.chroot = "",
+
+	.drop_priv_before_exec = FALSE,
+
+	.process_min_avail = 0,
+	.process_limit = 1,
+	.client_limit = 0,
+	.service_count = 0,
+	.idle_kill = 0,
+	.vsz_limit = (uoff_t)-1,
+
+	.unix_listeners = { { &indexer_unix_listeners_buf,
+			      sizeof(indexer_unix_listeners[0]) } },
+	.fifo_listeners = ARRAY_INIT,
+	.inet_listeners = ARRAY_INIT
+};
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/indexer-worker-settings.c	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,46 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "buffer.h"
+#include "settings-parser.h"
+#include "service-settings.h"
+
+#include <stddef.h>
+
+/* <settings checks> */
+static struct file_listener_settings indexer_worker_unix_listeners_array[] = {
+	{ "indexer-worker", 0666, "", "" }
+};
+static struct file_listener_settings *indexer_worker_unix_listeners[] = {
+	&indexer_worker_unix_listeners_array[0]
+};
+static buffer_t indexer_worker_unix_listeners_buf = {
+	indexer_worker_unix_listeners, sizeof(indexer_worker_unix_listeners), { 0, }
+};
+/* </settings checks> */
+
+struct service_settings indexer_worker_service_settings = {
+	.name = "indexer-worker",
+	.protocol = "",
+	.type = "",
+	.executable = "indexer-worker",
+	.user = "",
+	.group = "",
+	.privileged_group = "",
+	.extra_groups = "",
+	.chroot = "",
+
+	.drop_priv_before_exec = FALSE,
+
+	.process_min_avail = 0,
+	.process_limit = 10,
+	.client_limit = 1,
+	.service_count = 0,
+	.idle_kill = 0,
+	.vsz_limit = (uoff_t)-1,
+
+	.unix_listeners = { { &indexer_worker_unix_listeners_buf,
+			      sizeof(indexer_worker_unix_listeners[0]) } },
+	.fifo_listeners = ARRAY_INIT,
+	.inet_listeners = ARRAY_INIT
+};
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/indexer-worker.c	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,74 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "restrict-access.h"
+#include "mail-storage-service.h"
+#include "mail-storage-settings.h"
+#include "master-service.h"
+#include "master-service-settings.h"
+#include "master-connection.h"
+
+static struct master_connection *master_conn;
+static struct mail_storage_service_ctx *storage_service;
+
+static void client_connected(struct master_service_connection *conn)
+{
+	if (master_conn != NULL) {
+		i_error("indexer-worker must be configured with client_limit=1");
+		return;
+	}
+
+	master_service_client_connection_accept(conn);
+	master_conn = master_connection_create(conn->fd, storage_service);
+}
+
+static void drop_privileges(void)
+{
+	struct restrict_access_settings set;
+	const char *error;
+
+	/* by default we don't drop any privileges, but keep running as root. */
+	restrict_access_get_env(&set);
+	if (set.uid != 0) {
+		/* open config connection before dropping privileges */
+		struct master_service_settings_input input;
+		struct master_service_settings_output output;
+
+		memset(&input, 0, sizeof(input));
+		input.module = "mail";
+		input.service = "indexer-worker";
+		(void)master_service_settings_read(master_service,
+						   &input, &output, &error);
+	}
+	restrict_access_by_env(NULL, FALSE);
+}
+
+int main(int argc, char *argv[])
+{
+	enum mail_storage_service_flags storage_service_flags =
+		MAIL_STORAGE_SERVICE_FLAG_DISALLOW_ROOT |
+		MAIL_STORAGE_SERVICE_FLAG_USERDB_LOOKUP |
+		MAIL_STORAGE_SERVICE_FLAG_TEMP_PRIV_DROP |
+		MAIL_STORAGE_SERVICE_FLAG_NO_IDLE_TIMEOUT;
+
+	master_service = master_service_init("indexer-worker", 0,
+					     &argc, &argv, NULL);
+	if (master_getopt(master_service) > 0)
+		return FATAL_DEFAULT;
+
+	drop_privileges();
+	master_service_init_log(master_service, "indexer-worker: ");
+	master_service_init_finish(master_service);
+
+	storage_service = mail_storage_service_init(master_service, NULL,
+						    storage_service_flags);
+	restrict_access_allow_coredumps(TRUE);
+
+	master_service_run(master_service, client_connected);
+
+	if (master_conn != NULL)
+		master_connection_destroy(&master_conn);
+	mail_storage_service_deinit(&storage_service);
+	master_service_deinit(&master_service);
+        return 0;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/indexer.c	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,131 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "restrict-access.h"
+#include "master-service.h"
+#include "indexer-client.h"
+#include "indexer-queue.h"
+#include "worker-pool.h"
+#include "worker-connection.h"
+
+struct worker_request {
+	struct worker_connection *conn;
+	struct indexer_request *request;
+};
+
+static struct indexer_queue *queue;
+static struct worker_pool *worker_pool;
+
+static bool idle_die(void)
+{
+	return indexer_queue_is_empty(queue);
+}
+
+static void client_connected(struct master_service_connection *conn)
+{
+	master_service_client_connection_accept(conn);
+	(void)indexer_client_create(conn->fd, queue);
+}
+
+static void worker_send_request(struct worker_connection *conn,
+				struct indexer_request *request)
+{
+	struct worker_request *wrequest;
+
+	wrequest = i_new(struct worker_request, 1);
+	wrequest->conn = conn;
+	wrequest->request = request;
+
+	worker_connection_request(conn, request->username,
+				  request->mailbox, wrequest);
+}
+
+static void queue_handle_existing_user_requests(struct indexer_queue *queue)
+{
+	struct worker_connection *conn;
+	struct indexer_request *request;
+
+	while ((request = indexer_queue_request_peek(queue)) != NULL) {
+		conn = worker_pool_find_username_connection(worker_pool,
+							    request->username);
+		if (conn == NULL)
+			break;
+
+		indexer_queue_request_remove(queue);
+		/* there is already a worker handling this user.
+		   it must be the one doing the indexing. */
+		worker_send_request(conn, request);
+	}
+}
+
+static void queue_try_send_more(struct indexer_queue *queue)
+{
+	struct worker_connection *conn;
+	struct indexer_request *request;
+
+	queue_handle_existing_user_requests(queue);
+
+	request = indexer_queue_request_peek(queue);
+	if (request == NULL)
+		return;
+
+	/* okay, we have a request for a new user. */
+	if (!worker_pool_get_connection(worker_pool, &conn))
+		return;
+
+	indexer_queue_request_remove(queue);
+	worker_send_request(conn, request);
+}
+
+static void queue_listen_callback(struct indexer_queue *queue)
+{
+	queue_try_send_more(queue);
+}
+
+static void worker_status_callback(int percentage, void *context)
+{
+	struct worker_request *request = context;
+
+	if (percentage >= 0 && percentage < 100) {
+		indexer_queue_request_status(queue, request->request,
+					     percentage);
+		return;
+	}
+
+	indexer_queue_request_finish(queue, &request->request,
+				     percentage == 100);
+	worker_pool_release_connection(worker_pool, request->conn);
+	i_free(request);
+
+	/* if this was the last request for the connection, we can send more
+	   through it */
+	queue_try_send_more(queue);
+}
+
+int main(int argc, char *argv[])
+{
+	master_service = master_service_init("indexer", 0, &argc, &argv, NULL);
+	if (master_getopt(master_service) > 0)
+		return FATAL_DEFAULT;
+
+	master_service_init_log(master_service, "indexer: ");
+	restrict_access_by_env(NULL, FALSE);
+	restrict_access_allow_coredumps(TRUE);
+	master_service_set_idle_die_callback(master_service, idle_die);
+
+	master_service_init_finish(master_service);
+	worker_pool = worker_pool_init("indexer-worker",
+				       worker_status_callback);
+	queue = indexer_queue_init(indexer_client_status_callback);
+	indexer_queue_set_listen_callback(queue, queue_listen_callback);
+
+	master_service_run(master_service, client_connected);
+
+	indexer_queue_cancel_all(queue);
+	indexer_clients_destroy_all();
+	indexer_queue_deinit(&queue);
+	worker_pool_deinit(&worker_pool);
+
+	master_service_deinit(&master_service);
+        return 0;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/indexer.h	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,7 @@
+#ifndef INDEXER_H
+#define INDEXER_H
+
+/* percentage: -1 = failed, 0..99 = indexing in progress, 100 = done */
+typedef void indexer_status_callback_t(int percentage, void *context);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/master-connection.c	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,169 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "istream.h"
+#include "write-full.h"
+#include "strescape.h"
+#include "master-service.h"
+#include "mail-namespace.h"
+#include "mail-storage.h"
+#include "mail-storage-service.h"
+#include "master-connection.h"
+
+#include <unistd.h>
+
+#define INDEXER_PROTOCOL_MAJOR_VERSION 1
+#define INDEXER_PROTOCOL_MINOR_VERSION 0
+
+#define INDEXER_WORKER_HANDSHAKE "VERSION\tindexer-worker-master\t1\t0\n%u\n"
+#define INDEXER_MASTER_NAME "indexer-master-worker"
+
+struct master_connection {
+	struct mail_storage_service_ctx *storage_service;
+
+	int fd;
+	struct io *io;
+	struct istream *input;
+	struct ostream *output;
+
+	unsigned int version_received:1;
+};
+
+static int index_mailbox(struct mail_user *user, const char *mailbox)
+{
+	struct mail_namespace *ns;
+	struct mailbox *box;
+	const char *errstr;
+	enum mail_error error;
+	int ret = 0;
+
+	ns = mail_namespace_find(user->namespaces, mailbox);
+	if (ns == NULL) {
+		i_error("Namespace not found for mailbox %s: ", mailbox);
+		return -1;
+	}
+
+	/* FIXME: the current lib-storage API doesn't allow sending
+	   "n% competed" notifications */
+	box = mailbox_alloc(ns->list, mailbox, MAILBOX_FLAG_KEEP_RECENT);
+	if (mailbox_sync(box, MAILBOX_SYNC_FLAG_FULL_READ |
+			 MAILBOX_SYNC_FLAG_PRECACHE) < 0) {
+		errstr = mail_storage_get_last_error(mailbox_get_storage(box),
+						     &error);
+		if (error != MAIL_ERROR_NOTFOUND) {
+			i_error("Syncing mailbox %s failed: %s",
+				mailbox, errstr);
+		} else if (user->mail_debug) {
+			i_debug("Syncing mailbox %s failed: %s",
+				mailbox, errstr);
+		}
+		ret = -1;
+	}
+	mailbox_free(&box);
+	return ret;
+}
+
+static int
+master_connection_input_line(struct master_connection *conn, const char *line)
+{
+	const char *const *args = t_strsplit_tabescaped(line);
+	struct mail_storage_service_input input;
+	struct mail_storage_service_user *service_user;
+	struct mail_user *user;
+	const char *str, *error;
+	int ret;
+
+	/* <username> <mailbox> */
+	if (str_array_length(args) != 2) {
+		i_error("Invalid input from master: %s", line);
+		return -1;
+	}
+
+	memset(&input, 0, sizeof(input));
+	input.module = "mail";
+	input.service = "indexer-worker";
+	input.username = args[0];
+
+	if (mail_storage_service_lookup_next(conn->storage_service, &input,
+					     &service_user, &user, &error) <= 0) {
+		i_error("User %s lookup failed: %s", args[0], error);
+		ret = -1;
+	} else {
+		ret = index_mailbox(user, args[1]);
+		mail_user_unref(&user);
+		mail_storage_service_user_free(&service_user);
+	}
+
+	str = ret < 0 ? "-1\n" : "100\n";
+	return write_full(conn->fd, str, strlen(str));
+}
+
+static void master_connection_input(struct master_connection *conn)
+{
+	const char *line;
+	int ret;
+
+	if (i_stream_read(conn->input) < 0) {
+		master_service_stop(master_service);
+		return;
+	}
+
+	if (!conn->version_received) {
+		if ((line = i_stream_next_line(conn->input)) == NULL)
+			return;
+
+		if (!version_string_verify(line, INDEXER_MASTER_NAME,
+				INDEXER_PROTOCOL_MAJOR_VERSION)) {
+			i_error("Indexer master not compatible with this master "
+				"(mixed old and new binaries?)");
+			master_service_stop(master_service);
+			return;
+		}
+		conn->version_received = TRUE;
+	}
+
+	while ((line = i_stream_next_line(conn->input)) != NULL) {
+		T_BEGIN {
+			ret = master_connection_input_line(conn, line);
+		} T_END;
+		if (ret < 0) {
+			master_service_stop(master_service);
+			break;
+		}
+	}
+}
+
+struct master_connection *
+master_connection_create(int fd, struct mail_storage_service_ctx *storage_service)
+{
+	struct master_connection *conn;
+	const char *handshake;
+
+	conn = i_new(struct master_connection, 1);
+	conn->storage_service = storage_service;
+	conn->fd = fd;
+	conn->io = io_add(conn->fd, IO_READ, master_connection_input, conn);
+	conn->input = i_stream_create_fd(conn->fd, (size_t)-1, FALSE);
+
+	handshake = t_strdup_printf(INDEXER_WORKER_HANDSHAKE,
+		master_service_get_process_limit(master_service));
+	(void)write_full(conn->fd, handshake, strlen(handshake));
+	return conn;
+}
+
+void master_connection_destroy(struct master_connection **_conn)
+{
+	struct master_connection *conn = *_conn;
+
+	*_conn = NULL;
+
+	io_remove(&conn->io);
+	i_stream_destroy(&conn->input);
+
+	if (close(conn->fd) < 0)
+		i_error("close(master conn) failed: %m");
+	i_free(conn);
+
+	master_service_client_connection_destroyed(master_service);
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/master-connection.h	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,8 @@
+#ifndef MASTER_CONNECTION_H
+#define MASTER_CONNECTION_H
+
+struct master_connection *
+master_connection_create(int fd, struct mail_storage_service_ctx *storage_service);
+void master_connection_destroy(struct master_connection **conn);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/worker-connection.c	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,227 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "aqueue.h"
+#include "ioloop.h"
+#include "istream.h"
+#include "ostream.h"
+#include "str.h"
+#include "strescape.h"
+#include "master-service.h"
+#include "worker-connection.h"
+
+#include <unistd.h>
+
+#define INDEXER_PROTOCOL_MAJOR_VERSION 1
+#define INDEXER_PROTOCOL_MINOR_VERSION 0
+
+#define INDEXER_MASTER_HANDSHAKE "VERSION\tindexer-master-worker\t1\t0\n"
+#define INDEXER_WORKER_NAME "indexer-worker-master"
+
+struct worker_connection {
+	char *socket_path;
+	indexer_status_callback_t *callback;
+
+	int fd;
+	struct io *io;
+	struct istream *input;
+	struct ostream *output;
+
+	char *request_username;
+	ARRAY_DEFINE(request_contexts, void *);
+	struct aqueue *request_queue;
+
+	unsigned int process_limit;
+	unsigned int version_received:1;
+};
+
+struct worker_connection *
+worker_connection_create(const char *socket_path,
+			 indexer_status_callback_t *callback)
+{
+	struct worker_connection *conn;
+
+	conn = i_new(struct worker_connection, 1);
+	conn->socket_path = i_strdup(socket_path);
+	conn->callback = callback;
+	conn->fd = -1;
+	i_array_init(&conn->request_contexts, 32);
+	conn->request_queue = aqueue_init(&conn->request_contexts.arr);
+	return conn;
+}
+
+static void worker_connection_disconnect(struct worker_connection *conn)
+{
+	unsigned int i, count = aqueue_count(conn->request_queue);
+
+	if (conn->fd != -1) {
+		io_remove(&conn->io);
+		i_stream_destroy(&conn->input);
+		o_stream_destroy(&conn->output);
+
+		if (close(conn->fd) < 0)
+			i_error("close(%s) failed: %m", conn->socket_path);
+		conn->fd = -1;
+	}
+
+	/* cancel any pending requests */
+	for (i = 0; i < count; i++) {
+		void *const *contextp =
+			array_idx(&conn->request_contexts,
+				  aqueue_idx(conn->request_queue, i));
+		conn->callback(-1, *contextp);
+	}
+	aqueue_clear(conn->request_queue);
+}
+
+void worker_connection_destroy(struct worker_connection **_conn)
+{
+	struct worker_connection *conn = *_conn;
+
+	*_conn = NULL;
+
+	worker_connection_disconnect(conn);
+
+	aqueue_deinit(&conn->request_queue);
+	array_free(&conn->request_contexts);
+	i_free(conn->socket_path);
+	i_free(conn);
+}
+
+static int
+worker_connection_input_line(struct worker_connection *conn, const char *line)
+{
+	void *const *contextp, *context;
+	int percentage;
+
+	if (aqueue_count(conn->request_queue) == 0) {
+		i_error("Input from worker without pending requests: %s", line);
+		return -1;
+	}
+
+	if (str_to_int(line, &percentage) < 0 ||
+	    percentage < -1 || percentage > 100) {
+		i_error("Invalid input from worker: %s", line);
+		return -1;
+	}
+
+	contextp = array_idx(&conn->request_contexts,
+			     aqueue_idx(conn->request_queue, 0));
+	context = *contextp;
+	aqueue_delete_tail(conn->request_queue);
+
+	if (aqueue_count(conn->request_queue) == 0)
+		i_free_and_null(conn->request_username);
+
+	conn->callback(percentage, context);
+	return 0;
+}
+
+static void worker_connection_input(struct worker_connection *conn)
+{
+	const char *line;
+
+	if (i_stream_read(conn->input) < 0) {
+		worker_connection_disconnect(conn);
+		return;
+	}
+
+	if (!conn->version_received) {
+		if ((line = i_stream_next_line(conn->input)) == NULL)
+			return;
+
+		if (!version_string_verify(line, INDEXER_WORKER_NAME,
+				INDEXER_PROTOCOL_MAJOR_VERSION)) {
+			i_error("Indexer worker not compatible with this master "
+				"(mixed old and new binaries?)");
+			worker_connection_disconnect(conn);
+			return;
+		}
+		conn->version_received = TRUE;
+	}
+	if (conn->process_limit == 0) {
+		if ((line = i_stream_next_line(conn->input)) == NULL)
+			return;
+		if (str_to_uint(line, &conn->process_limit) < 0 ||
+		    conn->process_limit == 0) {
+			i_error("Indexer worker sent invalid handshake: %s",
+				line);
+			worker_connection_disconnect(conn);
+			return;
+		}
+	}
+
+	while ((line = i_stream_next_line(conn->input)) != NULL) {
+		if (worker_connection_input_line(conn, line) < 0) {
+			worker_connection_disconnect(conn);
+			break;
+		}
+	}
+}
+
+int worker_connection_connect(struct worker_connection *conn)
+{
+	i_assert(conn->fd == -1);
+
+	conn->fd = net_connect_unix(conn->socket_path);
+	if (conn->fd == -1) {
+		i_error("connect(%s) failed: %m", conn->socket_path);
+		return -1;
+	}
+	conn->io = io_add(conn->fd, IO_READ, worker_connection_input, conn);
+	conn->input = i_stream_create_fd(conn->fd, (size_t)-1, FALSE);
+	conn->output = o_stream_create_fd(conn->fd, (size_t)-1, FALSE);
+	o_stream_send_str(conn->output, INDEXER_MASTER_HANDSHAKE);
+	return 0;
+}
+
+bool worker_connection_is_connected(struct worker_connection *conn)
+{
+	return conn->fd != -1;
+}
+
+bool worker_connection_get_process_limit(struct worker_connection *conn,
+					 unsigned int *limit_r)
+{
+	if (conn->process_limit == 0)
+		return FALSE;
+
+	*limit_r = conn->process_limit;
+	return TRUE;
+}
+
+void worker_connection_request(struct worker_connection *conn,
+			       const char *username, const char *mailbox,
+			       void *context)
+{
+	i_assert(worker_connection_is_connected(conn));
+	i_assert(context != NULL);
+
+	if (conn->request_username == NULL)
+		conn->request_username = i_strdup(username);
+	else
+		i_assert(strcmp(conn->request_username, username) == 0);
+
+	aqueue_append(conn->request_queue, &context);
+
+	T_BEGIN {
+		string_t *str = t_str_new(128);
+
+		str_tabescape_write(str, username);
+		str_append_c(str, '\t');
+		str_tabescape_write(str, mailbox);
+		str_append_c(str, '\n');
+		o_stream_send(conn->output, str_data(str), str_len(str));
+	} T_END;
+}
+
+bool worker_connection_is_busy(struct worker_connection *conn)
+{
+	return aqueue_count(conn->request_queue) > 0;
+}
+
+const char *worker_connection_get_username(struct worker_connection *conn)
+{
+	return conn->request_username;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/worker-connection.h	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,33 @@
+#ifndef WORKER_CONNECTION_H
+#define WORKER_CONNECTION_H
+
+#include "indexer.h"
+
+struct worker_connection *
+worker_connection_create(const char *socket_path,
+			 indexer_status_callback_t *callback);
+void worker_connection_destroy(struct worker_connection **conn);
+
+int worker_connection_connect(struct worker_connection *conn);
+/* Returns TRUE if worker is connected to (not necessarily handshaked yet) */
+bool worker_connection_is_connected(struct worker_connection *conn);
+
+/* After initial handshake the worker process tells how many of its kind
+   can be at maximum. This returns the value, of FALSE if handshake isn't
+   finished yet. */
+bool worker_connection_get_process_limit(struct worker_connection *conn,
+					 unsigned int *limit_r);
+
+/* Send a new indexing request for username+mailbox. The status callback is
+   called as necessary with the given context. Requests can be queued, but
+   only for the same username. */
+void worker_connection_request(struct worker_connection *conn,
+			       const char *username, const char *mailbox,
+			       void *context);
+/* Returns TRUE if a request is being handled. */
+bool worker_connection_is_busy(struct worker_connection *conn);
+/* Returns username of the currently pending requests,
+   or NULL if there are none. */
+const char *worker_connection_get_username(struct worker_connection *conn);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/worker-pool.c	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,196 @@
+/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "llist.h"
+#include "worker-connection.h"
+#include "worker-pool.h"
+
+#define MAX_WORKER_IDLE_SECS (60*5)
+
+struct worker_connection_list {
+	struct worker_connection_list *prev, *next;
+
+	struct worker_connection *conn;
+	time_t last_use;
+};
+
+struct worker_pool {
+	char *socket_path;
+	indexer_status_callback_t *callback;
+
+	unsigned int connection_count;
+	struct worker_connection_list *busy_list, *idle_list;
+};
+
+static void
+worker_connection_idle_list_free(struct worker_pool *pool,
+				 struct worker_connection_list *list);
+
+struct worker_pool *
+worker_pool_init(const char *socket_path, indexer_status_callback_t *callback)
+{
+	struct worker_pool *pool;
+
+	pool = i_new(struct worker_pool, 1);
+	pool->socket_path = i_strdup(socket_path);
+	pool->callback = callback;
+	return pool;
+}
+
+void worker_pool_deinit(struct worker_pool **_pool)
+{
+	struct worker_pool *pool = *_pool;
+
+	*_pool = NULL;
+
+	while (pool->busy_list != NULL) {
+		struct worker_connection_list *list = pool->busy_list;
+
+		DLLIST_REMOVE(&pool->busy_list, list);
+		worker_connection_idle_list_free(pool, list);
+	}
+
+	while (pool->idle_list != NULL) {
+		struct worker_connection_list *list = pool->idle_list;
+
+		worker_connection_idle_list_free(pool, list);
+	}
+
+	i_free(pool->socket_path);
+	i_free(pool);
+}
+
+static int worker_pool_add_connection(struct worker_pool *pool)
+{
+	struct worker_connection *conn;
+	struct worker_connection_list *list;
+
+	conn = worker_connection_create(pool->socket_path, pool->callback);
+	if (worker_connection_connect(conn) < 0) {
+		worker_connection_destroy(&conn);
+		return -1;
+	}
+
+	i_assert(pool->idle_list == NULL);
+
+	list = i_new(struct worker_connection_list, 1);
+	list->conn = conn;
+	list->last_use = ioloop_time;
+	pool->idle_list = list;
+	pool->connection_count++;
+	return 0;
+}
+
+static void
+worker_connection_idle_list_free(struct worker_pool *pool,
+				 struct worker_connection_list *list)
+{
+	DLLIST_REMOVE(&pool->idle_list, list);
+	worker_connection_destroy(&list->conn);
+	i_free(list);
+
+	i_assert(pool->connection_count > 0);
+	pool->connection_count--;
+}
+
+static unsigned int worker_pool_find_max_connections(struct worker_pool *pool)
+{
+	struct worker_connection_list *list;
+	unsigned int limit;
+
+	i_assert(pool->idle_list == NULL);
+
+	if (pool->busy_list == NULL)
+		return 1;
+
+	for (list = pool->busy_list; list != NULL; list = list->next) {
+		if (worker_connection_get_process_limit(list->conn, &limit))
+			return limit;
+	}
+	/* we have at least one connection that has already been created,
+	   but without having handshaked yet. wait until it's finished. */
+	return 0;
+}
+
+bool worker_pool_get_connection(struct worker_pool *pool,
+				struct worker_connection **conn_r)
+{
+	struct worker_connection_list *list;
+	unsigned int max_connections;
+
+	while (pool->idle_list != NULL &&
+	       !worker_connection_is_connected(pool->idle_list->conn))
+		worker_connection_idle_list_free(pool, pool->idle_list);
+
+	if (pool->idle_list == NULL) {
+		max_connections = worker_pool_find_max_connections(pool);
+		if (pool->connection_count >= max_connections)
+			return FALSE;
+		if (worker_pool_add_connection(pool) < 0)
+			return FALSE;
+		i_assert(pool->idle_list != NULL);
+	}
+	list = pool->idle_list;
+	DLLIST_REMOVE(&pool->idle_list, list);
+	DLLIST_PREPEND(&pool->busy_list, list);
+
+	*conn_r = list->conn;
+	return TRUE;
+}
+
+static void worker_pool_kill_idle_connections(struct worker_pool *pool)
+{
+	struct worker_connection_list *list, *next;
+	time_t kill_timestamp;
+
+	kill_timestamp = ioloop_time - MAX_WORKER_IDLE_SECS;
+	for (list = pool->idle_list; list != NULL; list = next) {
+		next = list->next;
+		if (list->last_use < kill_timestamp)
+			worker_connection_idle_list_free(pool, list);
+	}
+}
+
+void worker_pool_release_connection(struct worker_pool *pool,
+				    struct worker_connection *conn)
+{
+	struct worker_connection_list *list;
+
+	if (worker_connection_is_busy(conn)) {
+		/* not finished with all queued requests yet */
+		return;
+	}
+
+	for (list = pool->busy_list; list != NULL; list = list->next) {
+		if (list->conn == conn)
+			break;
+	}
+	i_assert(list != NULL);
+
+	DLLIST_REMOVE(&pool->busy_list, list);
+
+	if (!worker_connection_is_connected(conn))
+		worker_connection_idle_list_free(pool, list);
+	else {
+		DLLIST_PREPEND(&pool->idle_list, list);
+		list->last_use = ioloop_time;
+
+		worker_pool_kill_idle_connections(pool);
+	}
+}
+
+struct worker_connection *
+worker_pool_find_username_connection(struct worker_pool *pool,
+				     const char *username)
+{
+	struct worker_connection_list *list;
+	const char *worker_user;
+
+	for (list = pool->busy_list; list != NULL; list = list->next) {
+		worker_user = worker_connection_get_username(list->conn);
+		if (worker_user != NULL && strcmp(worker_user, username) == 0)
+			return list->conn;
+	}
+	return NULL;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/indexer/worker-pool.h	Mon Jun 27 23:02:40 2011 +0300
@@ -0,0 +1,21 @@
+#ifndef WORKER_POOL_H
+#define WORKER_POOL_H
+
+#include "indexer.h"
+
+struct worker_connection;
+
+struct worker_pool *
+worker_pool_init(const char *socket_path, indexer_status_callback_t *callback);
+void worker_pool_deinit(struct worker_pool **pool);
+
+bool worker_pool_get_connection(struct worker_pool *pool,
+				struct worker_connection **conn_r);
+void worker_pool_release_connection(struct worker_pool *pool,
+				    struct worker_connection *conn);
+
+struct worker_connection *
+worker_pool_find_username_connection(struct worker_pool *pool,
+				     const char *username);
+
+#endif