Mercurial > dovecot > core-2.2
view src/indexer/indexer-client.c @ 22656:1789bf2a1e01
director: Make sure HOST-RESET-USERS isn't used with max_moving_users=0
The reset command would just hang in that case. doveadm would never have
sent this, so this is just an extra sanity check.
author | Timo Sirainen <timo.sirainen@dovecot.fi> |
---|---|
date | Sun, 05 Nov 2017 23:51:56 +0200 |
parents | 2e2563132d5f |
children | cb108f786fb4 |
line wrap: on
line source
/* Copyright (c) 2011-2017 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "llist.h" #include "istream.h" #include "ostream.h" #include "strescape.h" #include "master-service.h" #include "indexer-queue.h" #include "indexer-client.h" #include <unistd.h> #define MAX_INBUF_SIZE (1024*64) #define INDEXER_CLIENT_PROTOCOL_MAJOR_VERSION 1 #define INDEXER_CLIENT_PROTOCOL_MINOR_VERSION 0 struct indexer_client { struct indexer_client *prev, *next; int refcount; struct indexer_queue *queue; int fd; struct istream *input; struct ostream *output; struct io *io; unsigned int version_received:1; unsigned int handshaked:1; unsigned int destroyed:1; }; struct indexer_client_request { struct indexer_client *client; unsigned int tag; }; static struct indexer_client *clients = NULL; static unsigned int clients_count = 0; static void indexer_client_destroy(struct indexer_client *client); static void indexer_client_ref(struct indexer_client *client); static void indexer_client_unref(struct indexer_client *client); static const char *const* indexer_client_next_line(struct indexer_client *client) { const char *line; char **args; unsigned int i; line = i_stream_next_line(client->input); if (line == NULL) return NULL; args = p_strsplit(pool_datastack_create(), line, "\t"); for (i = 0; args[i] != NULL; i++) args[i] = str_tabunescape(args[i]); return (void *)args; } static int indexer_client_request_queue(struct indexer_client *client, bool append, const char *const *args, const char **error_r) { struct indexer_client_request *ctx = NULL; const char *session_id = NULL; unsigned int tag, max_recent_msgs; /* <tag> <user> <mailbox> [<max_recent_msgs> [<session ID>]] */ if (str_array_length(args) < 3) { *error_r = "Wrong parameter count"; return -1; } if (str_to_uint(args[0], &tag) < 0) { *error_r = "Invalid tag"; return -1; } if (args[3] == NULL) max_recent_msgs = 0; else if (str_to_uint(args[3], &max_recent_msgs) < 0) { *error_r = "Invalid max_recent_msgs"; return -1; } else { session_id = args[4]; } if (tag != 0) { ctx = i_new(struct indexer_client_request, 1); ctx->client = client; ctx->tag = tag; indexer_client_ref(client); } indexer_queue_append(client->queue, append, args[1], args[2], session_id, max_recent_msgs, ctx); o_stream_nsend_str(client->output, t_strdup_printf("%u\tOK\n", tag)); return 0; } static int indexer_client_request_optimize(struct indexer_client *client, const char *const *args, const char **error_r) { struct indexer_client_request *ctx = NULL; unsigned int tag; /* <tag> <user> <mailbox> */ if (str_array_length(args) != 3) { *error_r = "Wrong parameter count"; return -1; } if (str_to_uint(args[0], &tag) < 0) { *error_r = "Invalid tag"; return -1; } if (tag != 0) { ctx = i_new(struct indexer_client_request, 1); ctx->client = client; ctx->tag = tag; indexer_client_ref(client); } indexer_queue_append_optimize(client->queue, args[1], args[2], ctx); o_stream_nsend_str(client->output, t_strdup_printf("%u\tOK\n", tag)); return 0; } static int indexer_client_request(struct indexer_client *client, const char *const *args, const char **error_r) { const char *cmd = args[0]; args++; if (strcmp(cmd, "APPEND") == 0) return indexer_client_request_queue(client, TRUE, args, error_r); else if (strcmp(cmd, "PREPEND") == 0) return indexer_client_request_queue(client, FALSE, args, error_r); else if (strcmp(cmd, "OPTIMIZE") == 0) return indexer_client_request_optimize(client, args, error_r); else { *error_r = t_strconcat("Unknown command: ", cmd, NULL); return -1; } } static void indexer_client_input(struct indexer_client *client) { const char *line, *const *args, *error; switch (i_stream_read(client->input)) { case -2: i_error("BUG: Client connection sent too much data"); indexer_client_destroy(client); return; case -1: indexer_client_destroy(client); return; } if (!client->version_received) { if ((line = i_stream_next_line(client->input)) == NULL) return; if (!version_string_verify(line, "indexer", INDEXER_CLIENT_PROTOCOL_MAJOR_VERSION)) { i_error("Client not compatible with this server " "(mixed old and new binaries?)"); indexer_client_destroy(client); return; } client->version_received = TRUE; } while ((args = indexer_client_next_line(client)) != NULL) { if (args[0] != NULL) { if (indexer_client_request(client, args, &error) < 0) { i_error("Client input error: %s", error); indexer_client_destroy(client); break; } } } } void indexer_client_status_callback(int percentage, void *context) { struct indexer_client_request *ctx = context; T_BEGIN { o_stream_nsend_str(ctx->client->output, t_strdup_printf("%u\t%d\n", ctx->tag, percentage)); } T_END; if (percentage < 0 || percentage == 100) { indexer_client_unref(ctx->client); i_free(ctx); } } struct indexer_client * indexer_client_create(int fd, struct indexer_queue *queue) { struct indexer_client *client; client = i_new(struct indexer_client, 1); client->refcount = 1; client->queue = queue; client->fd = fd; client->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE); client->output = o_stream_create_fd(fd, (size_t)-1, FALSE); o_stream_set_no_error_handling(client->output, TRUE); client->io = io_add(fd, IO_READ, indexer_client_input, client); DLLIST_PREPEND(&clients, client); clients_count++; indexer_refresh_proctitle(); return client; } static void indexer_client_destroy(struct indexer_client *client) { if (client->destroyed) return; client->destroyed = TRUE; DLLIST_REMOVE(&clients, client); io_remove(&client->io); i_stream_close(client->input); o_stream_close(client->output); if (close(client->fd) < 0) i_error("close(client) failed: %m"); client->fd = -1; indexer_client_unref(client); clients_count--; master_service_client_connection_destroyed(master_service); indexer_refresh_proctitle(); } static void indexer_client_ref(struct indexer_client *client) { i_assert(client->refcount > 0); client->refcount++; } static void indexer_client_unref(struct indexer_client *client) { i_assert(client->refcount > 0); if (--client->refcount > 0) return; i_stream_destroy(&client->input); o_stream_destroy(&client->output); i_free(client); } unsigned int indexer_clients_get_count(void) { return clients_count; } void indexer_clients_destroy_all(void) { while (clients != NULL) indexer_client_destroy(clients); }