Mercurial > dovecot > original-hg > dovecot-1.2
changeset 8110:1cf9844b7a20 HEAD
dict: Support large iterations by sending data in output stream flush callback.
Fixes also a hang if a lot of changes were done during the iteration.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Wed, 27 Aug 2008 08:27:35 +0300 |
parents | e7929190cd32 |
children | d49bdda63506 |
files | src/dict/dict-server.c |
diffstat | 1 files changed, 57 insertions(+), 18 deletions(-) [+] |
line wrap: on
line diff
--- a/src/dict/dict-server.c Wed Aug 27 08:21:39 2008 +0300 +++ b/src/dict/dict-server.c Wed Aug 27 08:27:35 2008 +0300 @@ -6,6 +6,7 @@ #include "network.h" #include "istream.h" #include "ostream.h" +#include "str.h" #include "dict.h" #include "dict-client.h" #include "dict-server.h" @@ -13,6 +14,8 @@ #include <stdlib.h> #include <unistd.h> +#define DICT_OUTPUT_OPTIMAL_SIZE 1024 + struct dict_server_transaction { unsigned int id; struct dict_transaction_context *ctx; @@ -32,6 +35,8 @@ struct istream *input; struct ostream *output; + struct dict_iterate_context *iter_ctx; + /* There are only a few transactions per client, so keeping them in array is fast enough */ ARRAY_DEFINE(transactions, struct dict_server_transaction); @@ -58,6 +63,11 @@ const char *value; int ret; + if (conn->iter_ctx != NULL) { + i_error("dict client: LOOKUP: Can't lookup while iterating"); + return -1; + } + /* <key> */ ret = dict_lookup(conn->dict, pool_datastack_create(), line, &value); if (ret > 0) { @@ -73,12 +83,45 @@ return 0; } +static int cmd_iterate_flush(struct dict_client_connection *conn) +{ + string_t *str; + const char *key, *value; + int ret; + + str = t_str_new(256); + o_stream_cork(conn->output); + while ((ret = dict_iterate(conn->iter_ctx, &key, &value)) > 0) { + str_truncate(str, 0); + str_printfa(str, "%s\t%s\n", key, value); + o_stream_send(conn->output, str_data(str), str_len(str)); + + if (o_stream_get_buffer_used_size(conn->output) > + DICT_OUTPUT_OPTIMAL_SIZE) { + if (o_stream_flush(conn->output) <= 0) + break; + /* flushed everything, continue */ + } + } + + if (ret <= 0) { + /* finished iterating */ + o_stream_unset_flush_callback(conn->output); + dict_iterate_deinit(&conn->iter_ctx); + o_stream_send(conn->output, "\n", 1); + } + o_stream_uncork(conn->output); + return ret <= 0 ? 1 : 0; +} + static int cmd_iterate(struct dict_client_connection *conn, const char *line) { - struct dict_iterate_context *ctx; const char *const *args; - const char *key, *value; - int ret; + + if (conn->iter_ctx != NULL) { + i_error("dict client: ITERATE: Already iterating"); + return -1; + } args = t_strsplit(line, "\t"); if (str_array_length(args) != 2) { @@ -87,22 +130,10 @@ } /* <flags> <path> */ - o_stream_cork(conn->output); - ctx = dict_iterate_init(conn->dict, args[1], atoi(args[0])); - while ((ret = dict_iterate(ctx, &key, &value)) > 0) { - /* FIXME: we don't want to keep blocking here. set a flush - function and send the replies there when buffer gets full */ - T_BEGIN { - const char *reply; + conn->iter_ctx = dict_iterate_init(conn->dict, args[1], atoi(args[0])); - reply = t_strdup_printf("%s\t%s\n", key, value); - o_stream_send_str(conn->output, reply); - } T_END; - } - dict_iterate_deinit(&ctx); - - o_stream_send_str(conn->output, "\n"); - o_stream_uncork(conn->output); + o_stream_set_flush_callback(conn->output, cmd_iterate_flush, conn); + cmd_iterate_flush(conn); return 0; } @@ -193,6 +224,11 @@ const char *reply; int ret; + if (conn->iter_ctx != NULL) { + i_error("dict client: COMMIT: Can't commit while iterating"); + return -1; + } + if (dict_server_transaction_lookup_parse(conn, line, &trans) < 0) return -1; @@ -431,6 +467,9 @@ array_free(&conn->transactions); } + if (conn->iter_ctx != NULL) + dict_iterate_deinit(&conn->iter_ctx); + io_remove(&conn->io); i_stream_destroy(&conn->input); o_stream_destroy(&conn->output);