Mercurial > dovecot > core-2.2
diff src/indexer/indexer-client.c @ 13084:0faaceb2f83c
Added "indexer" service, which uses worker processes to perform queued mailbox indexing.
Only a single worker process will index the same user at the same time. This
avoids lock waits, especially when doing full text search indexing with
backends that require locking.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Mon, 27 Jun 2011 23:02:40 +0300 |
parents | |
children | 268f76a75e51 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/indexer/indexer-client.c Mon Jun 27 23:02:40 2011 +0300 @@ -0,0 +1,221 @@ +/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "llist.h" +#include "istream.h" +#include "ostream.h" +#include "strescape.h" +#include "master-service.h" +#include "indexer-queue.h" +#include "indexer-client.h" + +#include <stdlib.h> +#include <unistd.h> + +#define MAX_INBUF_SIZE (1024*64) + +#define INDEXER_CLIENT_PROTOCOL_MAJOR_VERSION 1 +#define INDEXER_CLIENT_PROTOCOL_MINOR_VERSION 0 + +struct indexer_client { + struct indexer_client *prev, *next; + + int refcount; + struct indexer_queue *queue; + + int fd; + struct istream *input; + struct ostream *output; + struct io *io; + + unsigned int version_received:1; + unsigned int handshaked:1; + unsigned int destroyed:1; +}; + +struct indexer_client_request { + struct indexer_client *client; + unsigned int tag; +}; + +struct indexer_client *clients = NULL; + +static void indexer_client_destroy(struct indexer_client *client); +static void indexer_client_ref(struct indexer_client *client); +static void indexer_client_unref(struct indexer_client *client); + +static const char *const* +indexer_client_next_line(struct indexer_client *client) +{ + const char *line; + char **args; + unsigned int i; + + line = i_stream_next_line(client->input); + if (line == NULL) + return NULL; + + args = p_strsplit(pool_datastack_create(), line, "\t"); + for (i = 0; args[i] != NULL; i++) + args[i] = str_tabunescape(args[i]); + return (void *)args; +} + +static int +indexer_client_request_queue(struct indexer_client *client, bool append, + const char *const *args, const char **error_r) +{ + struct indexer_client_request *ctx = NULL; + unsigned int tag; + + if (str_array_length(args) != 3) { + *error_r = "Wrong parameter count"; + return -1; + } + if (str_to_uint(args[0], &tag) < 0) { + *error_r = "Invalid tag"; + return -1; + } + + if (tag != 0) { + ctx = i_new(struct indexer_client_request, 1); + ctx->client = client; + ctx->tag = tag; + indexer_client_ref(client); + } + + indexer_queue_append(client->queue, append, args[1], args[2], ctx); + o_stream_send_str(client->output, "OK\n"); + return 0; +} + +static int +indexer_client_request(struct indexer_client *client, + const char *const *args, const char **error_r) +{ + const char *cmd = args[0]; + + args++; + + if (strcmp(cmd, "APPEND") == 0) + return indexer_client_request_queue(client, TRUE, args, error_r); + else if (strcmp(cmd, "PREPEND") == 0) + return indexer_client_request_queue(client, FALSE, args, error_r); + else { + *error_r = t_strconcat("Unknown command: ", cmd, NULL); + return -1; + } +} + +static void indexer_client_input(void *context) +{ + struct indexer_client *client = context; + const char *line, *const *args, *error; + + switch (i_stream_read(client->input)) { + case -2: + i_error("BUG: Client connection sent too much data"); + indexer_client_destroy(client); + return; + case -1: + indexer_client_destroy(client); + return; + } + + if (!client->version_received) { + if ((line = i_stream_next_line(client->input)) == NULL) + return; + + if (!version_string_verify(line, "indexer", + INDEXER_CLIENT_PROTOCOL_MAJOR_VERSION)) { + i_error("Client not compatible with this server " + "(mixed old and new binaries?)"); + indexer_client_destroy(client); + return; + } + client->version_received = TRUE; + } + + while ((args = indexer_client_next_line(client)) != NULL) { + if (args[0] != NULL) { + if (indexer_client_request(client, args, &error) < 0) { + i_error("Client input error: %s", error); + indexer_client_destroy(client); + break; + } + } + } +} + +void indexer_client_status_callback(int percentage, void *context) +{ + struct indexer_client_request *ctx = context; + + T_BEGIN { + o_stream_send_str(ctx->client->output, + t_strdup_printf("%u\t%d\n", ctx->tag, percentage)); + } T_END; + if (percentage < 0 || percentage == 100) { + indexer_client_unref(ctx->client); + i_free(ctx); + } +} + +struct indexer_client * +indexer_client_create(int fd, struct indexer_queue *queue) +{ + struct indexer_client *client; + + client = i_new(struct indexer_client, 1); + client->refcount = 1; + client->queue = queue; + client->fd = fd; + client->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE); + client->output = o_stream_create_fd(fd, (size_t)-1, FALSE); + client->io = io_add(fd, IO_READ, indexer_client_input, client); + DLLIST_PREPEND(&clients, client); + return client; +} + +static void indexer_client_destroy(struct indexer_client *client) +{ + if (client->destroyed) + return; + client->destroyed = TRUE; + + DLLIST_REMOVE(&clients, client); + + io_remove(&client->io); + i_stream_close(client->input); + o_stream_close(client->output); + if (close(client->fd) < 0) + i_error("close(client) failed: %m"); + client->fd = -1; + indexer_client_unref(client); + + master_service_client_connection_destroyed(master_service); +} + +static void indexer_client_ref(struct indexer_client *client) +{ + i_assert(client->refcount > 0); + + client->refcount++; +} + +static void indexer_client_unref(struct indexer_client *client) +{ + i_assert(client->refcount > 0); + + if (--client->refcount > 0) + return; + i_stream_destroy(&client->input); + o_stream_destroy(&client->output); + i_free(client); +} + +void indexer_clients_destroy_all(void) +{ + while (clients != NULL) + indexer_client_destroy(clients); +}