changeset 9361:a1b92a251bb9 HEAD

dict: Added support for async commits. Changed dict_atomic_inc() behavior.
author Timo Sirainen <tss@iki.fi>
date Sun, 06 Sep 2009 20:44:00 -0400
parents 4530228c8993
children ea522175c549
files src/dict/dict-server.c src/lib-dict/dict-client.c src/lib-dict/dict-client.h src/lib-dict/dict-db.c src/lib-dict/dict-file.c src/lib-dict/dict-private.h src/lib-dict/dict-sql.c src/lib-dict/dict.c src/lib-dict/dict.h
diffstat 9 files changed, 333 insertions(+), 85 deletions(-) [+]
line wrap: on
line diff
--- a/src/dict/dict-server.c	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/dict/dict-server.c	Sun Sep 06 20:44:00 2009 -0400
@@ -18,6 +18,7 @@
 
 struct dict_server_transaction {
 	unsigned int id;
+	struct dict_client_connection *conn;
 	struct dict_transaction_context *ctx;
 };
 
@@ -93,7 +94,8 @@
 	o_stream_cork(conn->output);
 	while ((ret = dict_iterate(conn->iter_ctx, &key, &value)) > 0) {
 		str_truncate(str, 0);
-		str_printfa(str, "%s\t%s\n", key, value);
+		str_printfa(str, "%c%s\t%s\n", DICT_PROTOCOL_REPLY_OK,
+			    key, value);
 		o_stream_send(conn->output, str_data(str), str_len(str));
 
 		if (o_stream_get_buffer_used_size(conn->output) >
@@ -193,6 +195,7 @@
 	/* <id> */
 	trans = array_append_space(&conn->transactions);
 	trans->id = id;
+	trans->conn = conn;
 	trans->ctx = dict_transaction_begin(conn->dict);
 	return 0;
 }
@@ -221,7 +224,7 @@
 static int cmd_commit(struct dict_client_connection *conn, const char *line)
 {
 	struct dict_server_transaction *trans;
-	const char *reply;
+	char chr;
 	int ret;
 
 	if (conn->iter_ctx != NULL) {
@@ -233,13 +236,46 @@
 		return -1;
 
 	ret = dict_transaction_commit(&trans->ctx);
-	reply = t_strdup_printf("%c\n", ret == 0 ? DICT_PROTOCOL_REPLY_OK :
-				DICT_PROTOCOL_REPLY_FAIL);
-	o_stream_send_str(conn->output, reply);
+	switch (ret) {
+	case 1:
+		chr = DICT_PROTOCOL_REPLY_OK;
+		break;
+	case 0:
+		chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+		break;
+	default:
+		chr = DICT_PROTOCOL_REPLY_FAIL;
+		break;
+	}
+	o_stream_send_str(conn->output, t_strdup_printf("%c\n", chr));
 	dict_server_transaction_array_remove(conn, trans);
 	return 0;
 }
 
+static void cmd_commit_async_callback(int ret, void *context)
+{
+	struct dict_server_transaction *trans = context;
+	const char *reply;
+	char chr;
+
+	switch (ret) {
+	case 1:
+		chr = DICT_PROTOCOL_REPLY_OK;
+		break;
+	case 0:
+		chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+		break;
+	default:
+		chr = DICT_PROTOCOL_REPLY_FAIL;
+		break;
+	}
+	reply = t_strdup_printf("%c%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_COMMIT,
+				chr, trans->id);
+	o_stream_send_str(trans->conn->output, reply);
+
+	dict_server_transaction_array_remove(trans->conn, trans);
+}
+
 static int
 cmd_commit_async(struct dict_client_connection *conn, const char *line)
 {
@@ -253,8 +289,8 @@
 	if (dict_server_transaction_lookup_parse(conn, line, &trans) < 0)
 		return -1;
 
-	dict_transaction_commit_async(&trans->ctx);
-	dict_server_transaction_array_remove(conn, trans);
+	dict_transaction_commit_async(&trans->ctx, cmd_commit_async_callback,
+				      trans);
 	return 0;
 }
 
--- a/src/lib-dict/dict-client.c	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict-client.c	Sun Sep 06 20:44:00 2009 -0400
@@ -1,6 +1,7 @@
 /* Copyright (c) 2005-2009 Dovecot authors, see the included COPYING file */
 
 #include "lib.h"
+#include "llist.h"
 #include "str.h"
 #include "network.h"
 #include "istream.h"
@@ -8,6 +9,7 @@
 #include "dict-private.h"
 #include "dict-client.h"
 
+#include <stdlib.h>
 #include <unistd.h>
 #include <fcntl.h>
 
@@ -24,9 +26,13 @@
 	time_t last_connect_try;
 	struct istream *input;
 	struct ostream *output;
+	struct io *io;
+
+	struct client_dict_transaction_context *transactions;
 
 	unsigned int connect_counter;
 	unsigned int transaction_id_counter;
+	unsigned int async_commits;
 
 	unsigned int in_iteration:1;
 	unsigned int handshaked:1;
@@ -41,6 +47,11 @@
 
 struct client_dict_transaction_context {
 	struct dict_transaction_context ctx;
+	struct client_dict_transaction_context *prev, *next;
+
+	/* for async commits */
+	dict_transaction_commit_callback_t *callback;
+	void *context;
 
 	unsigned int id;
 	unsigned int connect_counter;
@@ -213,29 +224,97 @@
 	return 0;
 }
 
-static char *client_dict_read_line(struct client_dict *dict)
+static struct client_dict_transaction_context *
+client_dict_transaction_find(struct client_dict *dict, unsigned int id)
+{
+	struct client_dict_transaction_context *ctx;
+
+	for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) {
+		if (ctx->id == id)
+			return ctx;
+	}
+	return NULL;
+}
+
+static void
+client_dict_finish_transaction(struct client_dict *dict,
+			       unsigned int id, int ret)
 {
+	struct client_dict_transaction_context *ctx;
+
+	ctx = client_dict_transaction_find(dict, id);
+	if (ctx == NULL) {
+		i_error("dict-client: Unknown transaction id %u", id);
+		return;
+	}
+	if (ctx->callback != NULL)
+		ctx->callback(ret, ctx->context);
+
+	DLLIST_REMOVE(&dict->transactions, ctx);
+	i_free(ctx);
+
+	i_assert(dict->async_commits > 0);
+	if (--dict->async_commits == 0)
+		io_remove(&dict->io);
+}
+
+static int client_dict_read_one_line(struct client_dict *dict, char **line_r)
+{
+	unsigned int id;
 	char *line;
 	int ret;
 
-	line = i_stream_next_line(dict->input);
-	if (line != NULL)
-		return line;
-
-	while ((ret = i_stream_read(dict->input)) > 0) {
-		line = i_stream_next_line(dict->input);
-		if (line != NULL)
-			return line;
+	*line_r = NULL;
+	while ((line = i_stream_next_line(dict->input)) == NULL) {
+		ret = i_stream_read(dict->input);
+		switch (ret) {
+		case -1:
+			if (dict->input->stream_errno != 0)
+				i_error("read(%s) failed: %m", dict->path);
+			else {
+				i_error("read(%s) failed: Remote disconnected",
+					dict->path);
+			}
+			return -1;
+		case -2:
+			i_error("read(%s) returned too much data", dict->path);
+			return -1;
+		default:
+			i_assert(ret > 0);
+			break;
+		}
 	}
-	i_assert(ret < 0);
+	if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) {
+		switch (line[1]) {
+		case DICT_PROTOCOL_REPLY_OK:
+			ret = 1;
+			break;
+		case DICT_PROTOCOL_REPLY_NOTFOUND:
+			ret = 0;
+			break;
+		case DICT_PROTOCOL_REPLY_FAIL:
+			ret = -1;
+			break;
+		default:
+			i_error("dict-client: Invalid async commit line: %s",
+				line);
+			return 0;
+		}
+		id = strtoul(line+2, NULL, 10);
+		client_dict_finish_transaction(dict, id, ret);
+		return 0;
+	}
+	*line_r = line;
+	return 1;
+}
 
-	if (ret == -2)
-		i_error("read(%s) returned too much data", dict->path);
-	else if (dict->input->stream_errno == 0)
-		i_error("read(%s) failed: Remote disconnected", dict->path);
-	else
-		i_error("read(%s) failed: %m", dict->path);
-	return NULL;
+static char *client_dict_read_line(struct client_dict *dict)
+{
+	char *line;
+
+	while (client_dict_read_one_line(dict, &line) == 0)
+		;
+	return line;
 }
 
 static int client_dict_connect(struct client_dict *dict)
@@ -263,6 +342,7 @@
 	dict->input->blocking = TRUE;
 	dict->output = o_stream_create_fd(dict->fd, 4096, FALSE);
 	dict->transaction_id_counter = 0;
+	dict->async_commits = 0;
 
 	query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n",
 				DICT_PROTOCOL_CMD_HELLO,
@@ -283,6 +363,8 @@
 	dict->connect_counter++;
 	dict->handshaked = FALSE;
 
+	if (dict->io != NULL)
+		io_remove(&dict->io);
 	if (dict->input != NULL)
 		i_stream_destroy(&dict->input);
 	if (dict->output != NULL)
@@ -339,6 +421,21 @@
 	pool_unref(&dict->pool);
 }
 
+static int client_dict_wait(struct dict *_dict)
+{
+	struct client_dict *dict = (struct client_dict *)_dict;
+	char *line;
+	int ret = 0;
+
+	while (dict->async_commits > 0) {
+		if (client_dict_read_one_line(dict, &line) < 0) {
+			ret = -1;
+			break;
+		}
+	}
+	return ret;
+}
+
 static int client_dict_lookup(struct dict *_dict, pool_t pool,
 			      const char *key, const char **value_r)
 {
@@ -420,7 +517,10 @@
 	/* line contains key \t value */
 	p_clear(ctx->pool);
 
-	value = strchr(line, '\t');
+	if (*line != DICT_PROTOCOL_REPLY_OK)
+		value = NULL;
+	else
+		value = strchr(++line, '\t');
 	if (value == NULL) {
 		/* broken protocol */
 		i_error("dict client (%s) sent broken reply", dict->path);
@@ -454,38 +554,72 @@
 	ctx->ctx.dict = _dict;
 	ctx->id = ++dict->transaction_id_counter;
 
+	DLLIST_PREPEND(&dict->transactions, ctx);
 	return &ctx->ctx;
 }
 
-static int client_dict_transaction_commit(struct dict_transaction_context *_ctx,
-					  bool async)
+static void dict_async_input(struct client_dict *dict)
+{
+	char *line;
+	size_t size;
+	int ret;
+
+	i_assert(!dict->in_iteration);
+
+	do {
+		ret = client_dict_read_one_line(dict, &line);
+		(void)i_stream_get_data(dict->input, &size);
+	} while (ret == 0 && size > 0);
+
+	if (ret < 0)
+		io_remove(&dict->io);
+}
+
+static int
+client_dict_transaction_commit(struct dict_transaction_context *_ctx,
+			       bool async,
+			       dict_transaction_commit_callback_t *callback,
+			       void *context)
 {
 	struct client_dict_transaction_context *ctx =
 		(struct client_dict_transaction_context *)_ctx;
 	struct client_dict *dict = (struct client_dict *)_ctx->dict;
-	int ret = ctx->failed ? -1 : 0;
+	int ret = ctx->failed ? -1 : 1;
 
-	if (ctx->sent_begin) T_BEGIN {
+	if (ctx->sent_begin && !ctx->failed) T_BEGIN {
 		const char *query, *line;
 
-		query = t_strdup_printf("%c%u\n", ctx->failed ?
-					DICT_PROTOCOL_CMD_ROLLBACK :
-					(!async ? DICT_PROTOCOL_CMD_COMMIT :
-					 DICT_PROTOCOL_CMD_COMMIT_ASYNC),
+		query = t_strdup_printf("%c%u\n", !async ?
+					DICT_PROTOCOL_CMD_COMMIT :
+					DICT_PROTOCOL_CMD_COMMIT_ASYNC,
 					ctx->id);
 		if (client_dict_send_transaction_query(ctx, query) < 0)
 			ret = -1;
-		else if (ret < 0 || async) {
-			/* no reply */
+		else if (async) {
+			ctx->callback = callback;
+			ctx->context = context;
+			if (dict->async_commits++ == 0) {
+				dict->io = io_add(dict->fd, IO_READ,
+						  dict_async_input, dict);
+			}
 		} else {
 			/* sync commit, read reply */
 			line = client_dict_read_line(dict);
-			if (line == NULL || *line != DICT_PROTOCOL_REPLY_OK)
+			if (line == NULL)
+				ret = -1;
+			else if (*line == DICT_PROTOCOL_REPLY_OK)
+				ret = 1;
+			else if (*line == DICT_PROTOCOL_REPLY_NOTFOUND)
+				ret = 0;
+			else
 				ret = -1;
 		}
 	} T_END;
 
-	i_free(ctx);
+	if (ret < 0 || !async) {
+		DLLIST_REMOVE(&dict->transactions, ctx);
+		i_free(ctx);
+	}
 	return ret;
 }
 
@@ -494,6 +628,7 @@
 {
 	struct client_dict_transaction_context *ctx =
 		(struct client_dict_transaction_context *)_ctx;
+	struct client_dict *dict = (struct client_dict *)_ctx->dict;
 
 	if (ctx->sent_begin) T_BEGIN {
 		const char *query;
@@ -503,6 +638,7 @@
 		(void)client_dict_send_transaction_query(ctx, query);
 	} T_END;
 
+	DLLIST_REMOVE(&dict->transactions, ctx);
 	i_free(ctx);
 }
 
@@ -560,6 +696,7 @@
 	{
 		client_dict_init,
 		client_dict_deinit,
+		client_dict_wait,
 		client_dict_lookup,
 		client_dict_iterate_init,
 		client_dict_iterate,
--- a/src/lib-dict/dict-client.h	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict-client.h	Sun Sep 06 20:44:00 2009 -0400
@@ -28,10 +28,10 @@
 };
 
 enum {
-	/* For LOOKUP command */
 	DICT_PROTOCOL_REPLY_OK = 'O', /* <value> */
 	DICT_PROTOCOL_REPLY_NOTFOUND = 'N',
-	DICT_PROTOCOL_REPLY_FAIL = 'F'
+	DICT_PROTOCOL_REPLY_FAIL = 'F',
+	DICT_PROTOCOL_REPLY_ASYNC_COMMIT = 'A'
 };
 
 const char *dict_client_escape(const char *src);
--- a/src/lib-dict/dict-db.c	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict-db.c	Sun Sep 06 20:44:00 2009 -0400
@@ -372,16 +372,22 @@
 	return &ctx->ctx;
 }
 
-static int db_dict_transaction_commit(struct dict_transaction_context *_ctx,
-				      bool async ATTR_UNUSED)
+static int
+db_dict_transaction_commit(struct dict_transaction_context *_ctx,
+			   bool async ATTR_UNUSED,
+			   dict_transaction_commit_callback_t *callback,
+			   void *context)
 {
 	struct db_dict_transaction_context *ctx =
 		(struct db_dict_transaction_context *)_ctx;
 	int ret;
 
-	ret = ctx->tid->commit(ctx->tid, 0);
+	ret = ctx->tid->commit(ctx->tid, 0) < 0 ? -1 : 1;
 	i_free(ctx);
-	return ret == 0 ? 0 : -1;
+
+	if (callback != NULL)
+		callback(ret, context);
+	return ret;
 }
 
 static void db_dict_transaction_rollback(struct dict_transaction_context *_ctx)
@@ -447,6 +453,7 @@
 	{
 		db_dict_init,
 		db_dict_deinit,
+		NULL,
 		db_dict_lookup,
 		db_dict_iterate_init,
 		db_dict_iterate,
--- a/src/lib-dict/dict-file.c	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict-file.c	Sun Sep 06 20:44:00 2009 -0400
@@ -54,6 +54,8 @@
 
 	pool_t pool;
 	ARRAY_DEFINE(changes, struct file_dict_change);
+
+	unsigned int atomic_inc_not_found:1;
 };
 
 static struct dotlock_settings file_dict_dotlock_settings = {
@@ -252,9 +254,12 @@
 
 		switch (changes[i].type) {
 		case FILE_DICT_CHANGE_TYPE_INC:
-			diff = old_value == NULL ? 0 :
-				strtoll(old_value, NULL, 10);
-			diff += changes[i].value.diff;
+			if (old_value == NULL) {
+				ctx->atomic_inc_not_found = TRUE;
+				break;
+			}
+			diff = strtoll(old_value, NULL, 10) +
+				changes[i].value.diff;
 			tmp = t_strdup_printf("%lld", diff);
 			new_len = strlen(tmp);
 			if (old_value == NULL || new_len > strlen(old_value))
@@ -365,15 +370,26 @@
 	return 0;
 }
 
-static int file_dict_transaction_commit(struct dict_transaction_context *_ctx,
-					bool async ATTR_UNUSED)
+static int
+file_dict_transaction_commit(struct dict_transaction_context *_ctx,
+			     bool async ATTR_UNUSED,
+			     dict_transaction_commit_callback_t *callback,
+			     void *context)
 {
 	struct file_dict_transaction_context *ctx =
 		(struct file_dict_transaction_context *)_ctx;
 	int ret;
 
-	ret = file_dict_write_changes(ctx);
+	if (file_dict_write_changes(ctx) < 0)
+		ret = -1;
+	else if (ctx->atomic_inc_not_found)
+		ret = 0;
+	else
+		ret = 1;
 	pool_unref(&ctx->pool);
+
+	if (callback != NULL)
+		callback(ret, context);
 	return ret;
 }
 
@@ -429,6 +445,7 @@
 	{
 		file_dict_init,
 		file_dict_deinit,
+		NULL,
 		file_dict_lookup,
 		file_dict_iterate_init,
 		file_dict_iterate,
--- a/src/lib-dict/dict-private.h	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict-private.h	Sun Sep 06 20:44:00 2009 -0400
@@ -8,6 +8,7 @@
 			     enum dict_data_type value_type,
 			     const char *username, const char *base_dir);
 	void (*deinit)(struct dict *dict);
+	int (*wait)(struct dict *dict);
 
 	int (*lookup)(struct dict *dict, pool_t pool,
 		      const char *key, const char **value_r);
@@ -21,7 +22,9 @@
 
 	struct dict_transaction_context *(*transaction_init)(struct dict *dict);
 	int (*transaction_commit)(struct dict_transaction_context *ctx,
-				  bool async);
+				  bool async,
+				  dict_transaction_commit_callback_t *callback,
+				  void *context);
 	void (*transaction_rollback)(struct dict_transaction_context *ctx);
 
 	void (*set)(struct dict_transaction_context *ctx,
--- a/src/lib-dict/dict-sql.c	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict-sql.c	Sun Sep 06 20:44:00 2009 -0400
@@ -44,6 +44,11 @@
 	unsigned int key_prefix_len, pattern_prefix_len, next_map_idx;
 };
 
+struct sql_dict_inc_row {
+	struct sql_dict_inc_row *prev;
+	unsigned int rows;
+};
+
 struct sql_dict_transaction_context {
 	struct dict_transaction_context ctx;
 
@@ -52,6 +57,8 @@
 	const struct dict_sql_map *prev_inc_map;
 	char *prev_inc_key;
 	long long prev_inc_diff;
+	pool_t inc_row_pool;
+	struct sql_dict_inc_row *inc_row;
 
 	unsigned int failed:1;
 	unsigned int changed:1;
@@ -462,13 +469,16 @@
 	return &ctx->ctx;
 }
 
-static int sql_dict_transaction_commit(struct dict_transaction_context *_ctx,
-				       bool async ATTR_UNUSED)
+static int
+sql_dict_transaction_commit(struct dict_transaction_context *_ctx,
+			    bool async ATTR_UNUSED,
+			    dict_transaction_commit_callback_t *callback,
+			    void *context)
 {
 	struct sql_dict_transaction_context *ctx =
 		(struct sql_dict_transaction_context *)_ctx;
 	const char *error;
-	int ret;
+	int ret = 1;
 
 	if (ctx->prev_inc_map != NULL)
 		sql_dict_prev_inc_flush(ctx);
@@ -477,14 +487,27 @@
 		sql_transaction_rollback(&ctx->sql_ctx);
 		ret = -1;
 	} else if (_ctx->changed) {
-		ret = sql_transaction_commit_s(&ctx->sql_ctx, &error);
-		if (ret < 0)
+		if (sql_transaction_commit_s(&ctx->sql_ctx, &error) < 0) {
 			i_error("sql dict: commit failed: %s", error);
-	} else {
-		/* nothing to be done */
-		ret = 0;
+			ret = -1;
+		} else {
+			while (ctx->inc_row != NULL) {
+				i_assert(ctx->inc_row->rows != -1UL);
+				if (ctx->inc_row->rows == 0) {
+					ret = 0;
+					break;
+				}
+				ctx->inc_row = ctx->inc_row->prev;
+			}
+		}
 	}
+	if (ctx->inc_row_pool != NULL)
+		pool_unref(&ctx->inc_row_pool);
+	i_free(ctx->prev_inc_key);
 	i_free(ctx);
+
+	if (callback != NULL)
+		callback(ret, context);
 	return ret;
 }
 
@@ -495,6 +518,9 @@
 
 	if (_ctx->changed)
 		sql_transaction_rollback(&ctx->sql_ctx);
+
+	if (ctx->inc_row_pool != NULL)
+		pool_unref(&ctx->inc_row_pool);
 	i_free(ctx->prev_inc_key);
 	i_free(ctx);
 }
@@ -602,8 +628,7 @@
 			    fields[i].map->value_field);
 		if (fields[i].value[0] != '-')
 			str_append_c(query, '+');
-		else
-			str_append(query, fields[i].value);
+		str_append(query, fields[i].value);
 	}
 
 	sql_dict_where_build(dict, fields[0].map, build->extra_values,
@@ -679,6 +704,22 @@
 	} T_END;
 }
 
+static unsigned int *
+sql_dict_next_inc_row(struct sql_dict_transaction_context *ctx)
+{
+	struct sql_dict_inc_row *row;
+
+	if (ctx->inc_row_pool == NULL) {
+		ctx->inc_row_pool =
+			pool_alloconly_create("sql dict inc rows", 128);
+	}
+	row = p_new(ctx->inc_row_pool, struct sql_dict_inc_row, 1);
+	row->prev = ctx->inc_row;
+	row->rows = -1UL;
+	ctx->inc_row = row;
+	return &row->rows;
+}
+
 static void sql_dict_atomic_inc_real(struct sql_dict_transaction_context *ctx,
 				     const char *key, long long diff)
 {
@@ -692,7 +733,6 @@
 	T_BEGIN {
 		struct dict_sql_build_query build;
 		struct dict_sql_build_query_field field;
-		const char *query;
 
 		field.map = map;
 		field.value = t_strdup_printf("%lld", diff);
@@ -705,14 +745,8 @@
 		build.key1 = key[0];
 		build.inc = TRUE;
 
-		if (diff >= 0)
-			query = sql_dict_set_query(&build);
-		else {
-			/* negative changes can't never be initial values,
-			   use UPDATE directly. */
-			query = sql_dict_update_query(&build);
-		}
-		sql_update(ctx->sql_ctx, query);
+		sql_update_get_rows(ctx->sql_ctx, sql_dict_update_query(&build),
+				    sql_dict_next_inc_row(ctx));
 	} T_END;
 }
 
@@ -783,6 +817,7 @@
 		ctx->prev_inc_diff = diff;
 		return;
 	}
+
 	if (!sql_dict_maps_are_mergeable(dict, ctx->prev_inc_map, map,
 					 ctx->prev_inc_key, key, &values)) {
 		sql_dict_prev_inc_flush(ctx);
@@ -790,7 +825,6 @@
 	} else T_BEGIN {
 		struct dict_sql_build_query build;
 		struct dict_sql_build_query_field *field;
-		const char *query;
 
 		memset(&build, 0, sizeof(build));
 		build.dict = dict;
@@ -806,14 +840,8 @@
 		field->map = map;
 		field->value = t_strdup_printf("%lld", diff);
 
-		if (diff >= 0)
-			query = sql_dict_set_query(&build);
-		else {
-			/* negative changes can't never be initial values,
-			   use UPDATE directly. */
-			query = sql_dict_update_query(&build);
-		}
-		sql_update(ctx->sql_ctx, query);
+		sql_update_get_rows(ctx->sql_ctx, sql_dict_update_query(&build),
+				    sql_dict_next_inc_row(ctx));
 
 		i_free_and_null(ctx->prev_inc_key);
 		ctx->prev_inc_map = NULL;
@@ -826,6 +854,7 @@
 	{
 		sql_dict_init,
 		sql_dict_deinit,
+		NULL,
 		sql_dict_lookup,
 		sql_dict_iterate_init,
 		sql_dict_iterate,
--- a/src/lib-dict/dict.c	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict.c	Sun Sep 06 20:44:00 2009 -0400
@@ -96,6 +96,11 @@
 	dict->v.deinit(dict);
 }
 
+int dict_wait(struct dict *dict)
+{
+	return dict->v.wait == NULL ? 1 : dict->v.wait(dict);
+}
+
 static bool dict_key_prefix_is_valid(const char *key)
 {
 	return strncmp(key, DICT_PATH_SHARED, strlen(DICT_PATH_SHARED)) == 0 ||
@@ -141,15 +146,17 @@
 	struct dict_transaction_context *ctx = *_ctx;
 
 	*_ctx = NULL;
-	return ctx->dict->v.transaction_commit(ctx, FALSE);
+	return ctx->dict->v.transaction_commit(ctx, FALSE, NULL, NULL);
 }
 
-void dict_transaction_commit_async(struct dict_transaction_context **_ctx)
+void dict_transaction_commit_async(struct dict_transaction_context **_ctx,
+				   dict_transaction_commit_callback_t *callback,
+				   void *context)
 {
 	struct dict_transaction_context *ctx = *_ctx;
 
 	*_ctx = NULL;
-	ctx->dict->v.transaction_commit(ctx, TRUE);
+	ctx->dict->v.transaction_commit(ctx, TRUE, callback, context);
 }
 
 void dict_transaction_rollback(struct dict_transaction_context **_ctx)
--- a/src/lib-dict/dict.h	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict.h	Sun Sep 06 20:44:00 2009 -0400
@@ -17,6 +17,8 @@
 	DICT_DATA_TYPE_UINT32
 };
 
+typedef void dict_transaction_commit_callback_t(int ret, void *context);
+
 void dict_driver_register(struct dict *driver);
 void dict_driver_unregister(struct dict *driver);
 
@@ -32,6 +34,9 @@
 		       const char *username, const char *base_dir);
 /* Close dictionary. */
 void dict_deinit(struct dict **dict);
+/* Wait for all pending asynchronous transaction commits to finish.
+   Returns 0 if ok, -1 if error. */
+int dict_wait(struct dict *dict);
 
 /* Lookup value for key. Set it to NULL if it's not found.
    Returns 1 if found, 0 if not found and -1 if lookup failed. */
@@ -50,10 +55,16 @@
 
 /* Start a new dictionary transaction. */
 struct dict_transaction_context *dict_transaction_begin(struct dict *dict);
-/* Commit the transaction. Returns 0 if ok, -1 if failed. */
+/* Commit the transaction. Returns 1 if ok, 0 if dict_atomic_inc() was used
+   on a non-existing key, -1 if failed. */
 int dict_transaction_commit(struct dict_transaction_context **ctx);
-/* Commit the transaction, but don't wait to see if it finishes successfully. */
-void dict_transaction_commit_async(struct dict_transaction_context **ctx);
+/* Commit the transaction, but don't wait to see if it finishes successfully.
+   If callback isn't NULL, it's called eventually. If it's not called by the
+   time you want to deinitialize dict, call dict_flush() to wait for the
+   result. */
+void dict_transaction_commit_async(struct dict_transaction_context **ctx,
+				   dict_transaction_commit_callback_t *callback,
+				   void *context);
 /* Rollback all changes made in transaction. */
 void dict_transaction_rollback(struct dict_transaction_context **ctx);
 
@@ -65,7 +76,8 @@
 		const char *key);
 /* Increase/decrease a numeric value in dictionary. Note that the value is
    changed when transaction is being committed, so you can't know beforehand
-   what the value will become. */
+   what the value will become. The value is updated only if it already exists,
+   otherwise commit() will return 0. */
 void dict_atomic_inc(struct dict_transaction_context *ctx,
 		     const char *key, long long diff);