diff src/lib-dict/dict-client.c @ 9361:a1b92a251bb9 HEAD

dict: Added support for async commits. Changed dict_atomic_inc() behavior.
author Timo Sirainen <tss@iki.fi>
date Sun, 06 Sep 2009 20:44:00 -0400
parents eed86bcc33aa
children 00cd9aacd03c
line wrap: on
line diff
--- a/src/lib-dict/dict-client.c	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict-client.c	Sun Sep 06 20:44:00 2009 -0400
@@ -1,6 +1,7 @@
 /* Copyright (c) 2005-2009 Dovecot authors, see the included COPYING file */
 
 #include "lib.h"
+#include "llist.h"
 #include "str.h"
 #include "network.h"
 #include "istream.h"
@@ -8,6 +9,7 @@
 #include "dict-private.h"
 #include "dict-client.h"
 
+#include <stdlib.h>
 #include <unistd.h>
 #include <fcntl.h>
 
@@ -24,9 +26,13 @@
 	time_t last_connect_try;
 	struct istream *input;
 	struct ostream *output;
+	struct io *io;
+
+	struct client_dict_transaction_context *transactions;
 
 	unsigned int connect_counter;
 	unsigned int transaction_id_counter;
+	unsigned int async_commits;
 
 	unsigned int in_iteration:1;
 	unsigned int handshaked:1;
@@ -41,6 +47,11 @@
 
 struct client_dict_transaction_context {
 	struct dict_transaction_context ctx;
+	struct client_dict_transaction_context *prev, *next;
+
+	/* for async commits */
+	dict_transaction_commit_callback_t *callback;
+	void *context;
 
 	unsigned int id;
 	unsigned int connect_counter;
@@ -213,29 +224,97 @@
 	return 0;
 }
 
-static char *client_dict_read_line(struct client_dict *dict)
+static struct client_dict_transaction_context *
+client_dict_transaction_find(struct client_dict *dict, unsigned int id)
+{
+	struct client_dict_transaction_context *ctx;
+
+	for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) {
+		if (ctx->id == id)
+			return ctx;
+	}
+	return NULL;
+}
+
+static void
+client_dict_finish_transaction(struct client_dict *dict,
+			       unsigned int id, int ret)
 {
+	struct client_dict_transaction_context *ctx;
+
+	ctx = client_dict_transaction_find(dict, id);
+	if (ctx == NULL) {
+		i_error("dict-client: Unknown transaction id %u", id);
+		return;
+	}
+	if (ctx->callback != NULL)
+		ctx->callback(ret, ctx->context);
+
+	DLLIST_REMOVE(&dict->transactions, ctx);
+	i_free(ctx);
+
+	i_assert(dict->async_commits > 0);
+	if (--dict->async_commits == 0)
+		io_remove(&dict->io);
+}
+
+static int client_dict_read_one_line(struct client_dict *dict, char **line_r)
+{
+	unsigned int id;
 	char *line;
 	int ret;
 
-	line = i_stream_next_line(dict->input);
-	if (line != NULL)
-		return line;
-
-	while ((ret = i_stream_read(dict->input)) > 0) {
-		line = i_stream_next_line(dict->input);
-		if (line != NULL)
-			return line;
+	*line_r = NULL;
+	while ((line = i_stream_next_line(dict->input)) == NULL) {
+		ret = i_stream_read(dict->input);
+		switch (ret) {
+		case -1:
+			if (dict->input->stream_errno != 0)
+				i_error("read(%s) failed: %m", dict->path);
+			else {
+				i_error("read(%s) failed: Remote disconnected",
+					dict->path);
+			}
+			return -1;
+		case -2:
+			i_error("read(%s) returned too much data", dict->path);
+			return -1;
+		default:
+			i_assert(ret > 0);
+			break;
+		}
 	}
-	i_assert(ret < 0);
+	if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) {
+		switch (line[1]) {
+		case DICT_PROTOCOL_REPLY_OK:
+			ret = 1;
+			break;
+		case DICT_PROTOCOL_REPLY_NOTFOUND:
+			ret = 0;
+			break;
+		case DICT_PROTOCOL_REPLY_FAIL:
+			ret = -1;
+			break;
+		default:
+			i_error("dict-client: Invalid async commit line: %s",
+				line);
+			return 0;
+		}
+		id = strtoul(line+2, NULL, 10);
+		client_dict_finish_transaction(dict, id, ret);
+		return 0;
+	}
+	*line_r = line;
+	return 1;
+}
 
-	if (ret == -2)
-		i_error("read(%s) returned too much data", dict->path);
-	else if (dict->input->stream_errno == 0)
-		i_error("read(%s) failed: Remote disconnected", dict->path);
-	else
-		i_error("read(%s) failed: %m", dict->path);
-	return NULL;
+static char *client_dict_read_line(struct client_dict *dict)
+{
+	char *line;
+
+	while (client_dict_read_one_line(dict, &line) == 0)
+		;
+	return line;
 }
 
 static int client_dict_connect(struct client_dict *dict)
@@ -263,6 +342,7 @@
 	dict->input->blocking = TRUE;
 	dict->output = o_stream_create_fd(dict->fd, 4096, FALSE);
 	dict->transaction_id_counter = 0;
+	dict->async_commits = 0;
 
 	query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n",
 				DICT_PROTOCOL_CMD_HELLO,
@@ -283,6 +363,8 @@
 	dict->connect_counter++;
 	dict->handshaked = FALSE;
 
+	if (dict->io != NULL)
+		io_remove(&dict->io);
 	if (dict->input != NULL)
 		i_stream_destroy(&dict->input);
 	if (dict->output != NULL)
@@ -339,6 +421,21 @@
 	pool_unref(&dict->pool);
 }
 
+static int client_dict_wait(struct dict *_dict)
+{
+	struct client_dict *dict = (struct client_dict *)_dict;
+	char *line;
+	int ret = 0;
+
+	while (dict->async_commits > 0) {
+		if (client_dict_read_one_line(dict, &line) < 0) {
+			ret = -1;
+			break;
+		}
+	}
+	return ret;
+}
+
 static int client_dict_lookup(struct dict *_dict, pool_t pool,
 			      const char *key, const char **value_r)
 {
@@ -420,7 +517,10 @@
 	/* line contains key \t value */
 	p_clear(ctx->pool);
 
-	value = strchr(line, '\t');
+	if (*line != DICT_PROTOCOL_REPLY_OK)
+		value = NULL;
+	else
+		value = strchr(++line, '\t');
 	if (value == NULL) {
 		/* broken protocol */
 		i_error("dict client (%s) sent broken reply", dict->path);
@@ -454,38 +554,72 @@
 	ctx->ctx.dict = _dict;
 	ctx->id = ++dict->transaction_id_counter;
 
+	DLLIST_PREPEND(&dict->transactions, ctx);
 	return &ctx->ctx;
 }
 
-static int client_dict_transaction_commit(struct dict_transaction_context *_ctx,
-					  bool async)
+static void dict_async_input(struct client_dict *dict)
+{
+	char *line;
+	size_t size;
+	int ret;
+
+	i_assert(!dict->in_iteration);
+
+	do {
+		ret = client_dict_read_one_line(dict, &line);
+		(void)i_stream_get_data(dict->input, &size);
+	} while (ret == 0 && size > 0);
+
+	if (ret < 0)
+		io_remove(&dict->io);
+}
+
+static int
+client_dict_transaction_commit(struct dict_transaction_context *_ctx,
+			       bool async,
+			       dict_transaction_commit_callback_t *callback,
+			       void *context)
 {
 	struct client_dict_transaction_context *ctx =
 		(struct client_dict_transaction_context *)_ctx;
 	struct client_dict *dict = (struct client_dict *)_ctx->dict;
-	int ret = ctx->failed ? -1 : 0;
+	int ret = ctx->failed ? -1 : 1;
 
-	if (ctx->sent_begin) T_BEGIN {
+	if (ctx->sent_begin && !ctx->failed) T_BEGIN {
 		const char *query, *line;
 
-		query = t_strdup_printf("%c%u\n", ctx->failed ?
-					DICT_PROTOCOL_CMD_ROLLBACK :
-					(!async ? DICT_PROTOCOL_CMD_COMMIT :
-					 DICT_PROTOCOL_CMD_COMMIT_ASYNC),
+		query = t_strdup_printf("%c%u\n", !async ?
+					DICT_PROTOCOL_CMD_COMMIT :
+					DICT_PROTOCOL_CMD_COMMIT_ASYNC,
 					ctx->id);
 		if (client_dict_send_transaction_query(ctx, query) < 0)
 			ret = -1;
-		else if (ret < 0 || async) {
-			/* no reply */
+		else if (async) {
+			ctx->callback = callback;
+			ctx->context = context;
+			if (dict->async_commits++ == 0) {
+				dict->io = io_add(dict->fd, IO_READ,
+						  dict_async_input, dict);
+			}
 		} else {
 			/* sync commit, read reply */
 			line = client_dict_read_line(dict);
-			if (line == NULL || *line != DICT_PROTOCOL_REPLY_OK)
+			if (line == NULL)
+				ret = -1;
+			else if (*line == DICT_PROTOCOL_REPLY_OK)
+				ret = 1;
+			else if (*line == DICT_PROTOCOL_REPLY_NOTFOUND)
+				ret = 0;
+			else
 				ret = -1;
 		}
 	} T_END;
 
-	i_free(ctx);
+	if (ret < 0 || !async) {
+		DLLIST_REMOVE(&dict->transactions, ctx);
+		i_free(ctx);
+	}
 	return ret;
 }
 
@@ -494,6 +628,7 @@
 {
 	struct client_dict_transaction_context *ctx =
 		(struct client_dict_transaction_context *)_ctx;
+	struct client_dict *dict = (struct client_dict *)_ctx->dict;
 
 	if (ctx->sent_begin) T_BEGIN {
 		const char *query;
@@ -503,6 +638,7 @@
 		(void)client_dict_send_transaction_query(ctx, query);
 	} T_END;
 
+	DLLIST_REMOVE(&dict->transactions, ctx);
 	i_free(ctx);
 }
 
@@ -560,6 +696,7 @@
 	{
 		client_dict_init,
 		client_dict_deinit,
+		client_dict_wait,
 		client_dict_lookup,
 		client_dict_iterate_init,
 		client_dict_iterate,