Mercurial > dovecot > original-hg > dovecot-1.2
diff src/lib-dict/dict-client.c @ 9361:a1b92a251bb9 HEAD
dict: Added support for async commits. Changed dict_atomic_inc() behavior.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Sun, 06 Sep 2009 20:44:00 -0400 |
parents | eed86bcc33aa |
children | 00cd9aacd03c |
line wrap: on
line diff
--- a/src/lib-dict/dict-client.c Sun Sep 06 20:42:42 2009 -0400 +++ b/src/lib-dict/dict-client.c Sun Sep 06 20:44:00 2009 -0400 @@ -1,6 +1,7 @@ /* Copyright (c) 2005-2009 Dovecot authors, see the included COPYING file */ #include "lib.h" +#include "llist.h" #include "str.h" #include "network.h" #include "istream.h" @@ -8,6 +9,7 @@ #include "dict-private.h" #include "dict-client.h" +#include <stdlib.h> #include <unistd.h> #include <fcntl.h> @@ -24,9 +26,13 @@ time_t last_connect_try; struct istream *input; struct ostream *output; + struct io *io; + + struct client_dict_transaction_context *transactions; unsigned int connect_counter; unsigned int transaction_id_counter; + unsigned int async_commits; unsigned int in_iteration:1; unsigned int handshaked:1; @@ -41,6 +47,11 @@ struct client_dict_transaction_context { struct dict_transaction_context ctx; + struct client_dict_transaction_context *prev, *next; + + /* for async commits */ + dict_transaction_commit_callback_t *callback; + void *context; unsigned int id; unsigned int connect_counter; @@ -213,29 +224,97 @@ return 0; } -static char *client_dict_read_line(struct client_dict *dict) +static struct client_dict_transaction_context * +client_dict_transaction_find(struct client_dict *dict, unsigned int id) +{ + struct client_dict_transaction_context *ctx; + + for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) { + if (ctx->id == id) + return ctx; + } + return NULL; +} + +static void +client_dict_finish_transaction(struct client_dict *dict, + unsigned int id, int ret) { + struct client_dict_transaction_context *ctx; + + ctx = client_dict_transaction_find(dict, id); + if (ctx == NULL) { + i_error("dict-client: Unknown transaction id %u", id); + return; + } + if (ctx->callback != NULL) + ctx->callback(ret, ctx->context); + + DLLIST_REMOVE(&dict->transactions, ctx); + i_free(ctx); + + i_assert(dict->async_commits > 0); + if (--dict->async_commits == 0) + io_remove(&dict->io); +} + +static int client_dict_read_one_line(struct client_dict *dict, char **line_r) +{ + unsigned int id; char *line; int ret; - line = i_stream_next_line(dict->input); - if (line != NULL) - return line; - - while ((ret = i_stream_read(dict->input)) > 0) { - line = i_stream_next_line(dict->input); - if (line != NULL) - return line; + *line_r = NULL; + while ((line = i_stream_next_line(dict->input)) == NULL) { + ret = i_stream_read(dict->input); + switch (ret) { + case -1: + if (dict->input->stream_errno != 0) + i_error("read(%s) failed: %m", dict->path); + else { + i_error("read(%s) failed: Remote disconnected", + dict->path); + } + return -1; + case -2: + i_error("read(%s) returned too much data", dict->path); + return -1; + default: + i_assert(ret > 0); + break; + } } - i_assert(ret < 0); + if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) { + switch (line[1]) { + case DICT_PROTOCOL_REPLY_OK: + ret = 1; + break; + case DICT_PROTOCOL_REPLY_NOTFOUND: + ret = 0; + break; + case DICT_PROTOCOL_REPLY_FAIL: + ret = -1; + break; + default: + i_error("dict-client: Invalid async commit line: %s", + line); + return 0; + } + id = strtoul(line+2, NULL, 10); + client_dict_finish_transaction(dict, id, ret); + return 0; + } + *line_r = line; + return 1; +} - if (ret == -2) - i_error("read(%s) returned too much data", dict->path); - else if (dict->input->stream_errno == 0) - i_error("read(%s) failed: Remote disconnected", dict->path); - else - i_error("read(%s) failed: %m", dict->path); - return NULL; +static char *client_dict_read_line(struct client_dict *dict) +{ + char *line; + + while (client_dict_read_one_line(dict, &line) == 0) + ; + return line; } static int client_dict_connect(struct client_dict *dict) @@ -263,6 +342,7 @@ dict->input->blocking = TRUE; dict->output = o_stream_create_fd(dict->fd, 4096, FALSE); dict->transaction_id_counter = 0; + dict->async_commits = 0; query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n", DICT_PROTOCOL_CMD_HELLO, @@ -283,6 +363,8 @@ dict->connect_counter++; dict->handshaked = FALSE; + if (dict->io != NULL) + io_remove(&dict->io); if (dict->input != NULL) i_stream_destroy(&dict->input); if (dict->output != NULL) @@ -339,6 +421,21 @@ pool_unref(&dict->pool); } +static int client_dict_wait(struct dict *_dict) +{ + struct client_dict *dict = (struct client_dict *)_dict; + char *line; + int ret = 0; + + while (dict->async_commits > 0) { + if (client_dict_read_one_line(dict, &line) < 0) { + ret = -1; + break; + } + } + return ret; +} + static int client_dict_lookup(struct dict *_dict, pool_t pool, const char *key, const char **value_r) { @@ -420,7 +517,10 @@ /* line contains key \t value */ p_clear(ctx->pool); - value = strchr(line, '\t'); + if (*line != DICT_PROTOCOL_REPLY_OK) + value = NULL; + else + value = strchr(++line, '\t'); if (value == NULL) { /* broken protocol */ i_error("dict client (%s) sent broken reply", dict->path); @@ -454,38 +554,72 @@ ctx->ctx.dict = _dict; ctx->id = ++dict->transaction_id_counter; + DLLIST_PREPEND(&dict->transactions, ctx); return &ctx->ctx; } -static int client_dict_transaction_commit(struct dict_transaction_context *_ctx, - bool async) +static void dict_async_input(struct client_dict *dict) +{ + char *line; + size_t size; + int ret; + + i_assert(!dict->in_iteration); + + do { + ret = client_dict_read_one_line(dict, &line); + (void)i_stream_get_data(dict->input, &size); + } while (ret == 0 && size > 0); + + if (ret < 0) + io_remove(&dict->io); +} + +static int +client_dict_transaction_commit(struct dict_transaction_context *_ctx, + bool async, + dict_transaction_commit_callback_t *callback, + void *context) { struct client_dict_transaction_context *ctx = (struct client_dict_transaction_context *)_ctx; struct client_dict *dict = (struct client_dict *)_ctx->dict; - int ret = ctx->failed ? -1 : 0; + int ret = ctx->failed ? -1 : 1; - if (ctx->sent_begin) T_BEGIN { + if (ctx->sent_begin && !ctx->failed) T_BEGIN { const char *query, *line; - query = t_strdup_printf("%c%u\n", ctx->failed ? - DICT_PROTOCOL_CMD_ROLLBACK : - (!async ? DICT_PROTOCOL_CMD_COMMIT : - DICT_PROTOCOL_CMD_COMMIT_ASYNC), + query = t_strdup_printf("%c%u\n", !async ? + DICT_PROTOCOL_CMD_COMMIT : + DICT_PROTOCOL_CMD_COMMIT_ASYNC, ctx->id); if (client_dict_send_transaction_query(ctx, query) < 0) ret = -1; - else if (ret < 0 || async) { - /* no reply */ + else if (async) { + ctx->callback = callback; + ctx->context = context; + if (dict->async_commits++ == 0) { + dict->io = io_add(dict->fd, IO_READ, + dict_async_input, dict); + } } else { /* sync commit, read reply */ line = client_dict_read_line(dict); - if (line == NULL || *line != DICT_PROTOCOL_REPLY_OK) + if (line == NULL) + ret = -1; + else if (*line == DICT_PROTOCOL_REPLY_OK) + ret = 1; + else if (*line == DICT_PROTOCOL_REPLY_NOTFOUND) + ret = 0; + else ret = -1; } } T_END; - i_free(ctx); + if (ret < 0 || !async) { + DLLIST_REMOVE(&dict->transactions, ctx); + i_free(ctx); + } return ret; } @@ -494,6 +628,7 @@ { struct client_dict_transaction_context *ctx = (struct client_dict_transaction_context *)_ctx; + struct client_dict *dict = (struct client_dict *)_ctx->dict; if (ctx->sent_begin) T_BEGIN { const char *query; @@ -503,6 +638,7 @@ (void)client_dict_send_transaction_query(ctx, query); } T_END; + DLLIST_REMOVE(&dict->transactions, ctx); i_free(ctx); } @@ -560,6 +696,7 @@ { client_dict_init, client_dict_deinit, + client_dict_wait, client_dict_lookup, client_dict_iterate_init, client_dict_iterate,