changeset 11261:1c8cc349ef55 HEAD

lib-sql: Use generic sql connection pooling code for mysql/pgsql. It's possible to give multiple host settings to do load balancing / HA. If one host is down, another one is tried. All queries are automatically retried in another host if they fail in first one. Since PostgreSQL support async queries, Dovecot can create multiple connections to the database as needed, so it can do lookups in parallel. The number of connections can be changed with maxconns=n in connect_query, the default is 5.
author Timo Sirainen <tss@iki.fi>
date Tue, 04 May 2010 17:55:23 +0300
parents 669e8266927e
children a4614f53d298
files doc/example-config/dovecot-sql.conf.ext src/lib-sql/Makefile.am src/lib-sql/driver-mysql.c src/lib-sql/driver-pgsql.c src/lib-sql/driver-sqlite.c src/lib-sql/driver-sqlpool.c src/lib-sql/sql-api-private.h src/lib-sql/sql-api.c src/lib-sql/sql-api.h
diffstat 9 files changed, 1297 insertions(+), 647 deletions(-) [+]
line wrap: on
line diff
--- a/doc/example-config/dovecot-sql.conf.ext	Tue May 04 16:12:00 2010 +0300
+++ b/doc/example-config/dovecot-sql.conf.ext	Tue May 04 17:55:23 2010 +0300
@@ -30,9 +30,14 @@
 
 # Database connection string. This is driver-specific setting.
 #
+# HA / round-robin load-balancing is supported by giving multiple host
+# settings, like: host=sql1.host.org host=sql2.host.org
+#
 # pgsql:
 #   For available options, see the PostgreSQL documention for the
 #   PQconnectdb function of libpq.
+#   Use maxconns=n (default 5) to change how many connections Dovecot can
+#   create to pgsql.
 #
 # mysql:
 #   Basic options emulate PostgreSQL option names:
@@ -50,8 +55,6 @@
 #   You can connect to UNIX sockets by using host: host=/var/run/mysql.sock
 #   Note that currently you can't use spaces in parameters.
 #
-#   MySQL supports multiple host parameters for load balancing / HA.
-#
 # sqlite:
 #   The path to the database file.
 #
--- a/src/lib-sql/Makefile.am	Tue May 04 16:12:00 2010 +0300
+++ b/src/lib-sql/Makefile.am	Tue May 04 17:55:23 2010 +0300
@@ -38,6 +38,7 @@
 driver_sources = \
 	driver-mysql.c \
 	driver-pgsql.c \
+	driver-sqlpool.c \
 	driver-sqlite.c
 endif
 
--- a/src/lib-sql/driver-mysql.c	Tue May 04 16:12:00 2010 +0300
+++ b/src/lib-sql/driver-mysql.c	Tue May 04 17:55:23 2010 +0300
@@ -12,49 +12,23 @@
 #include <mysql.h>
 #include <errmsg.h>
 
-/* Abort connect() if it can't connect within this time. */
-#define MYSQL_CONNECT_FAILURE_TIMEOUT 10
-
-/* Minimum delay between reconnecting to same server */
-#define CONNECT_MIN_DELAY 1
-/* Maximum time to avoiding reconnecting to same server */
-#define CONNECT_MAX_DELAY (60*30)
-/* If no servers are connected but a query is requested, try reconnecting to
-   next server which has been disconnected longer than this (with a single
-   server setup this is really the "max delay" and the CONNECT_MAX_DELAY
-   is never used). */
-#define CONNECT_RESET_DELAY 15
-
 struct mysql_db {
 	struct sql_db api;
 
 	pool_t pool;
-	const char *user, *password, *dbname, *unix_socket;
+	const char *user, *password, *dbname, *host, *unix_socket;
 	const char *ssl_cert, *ssl_key, *ssl_ca, *ssl_ca_path, *ssl_cipher;
 	const char *option_file, *option_group;
 	unsigned int port, client_flags;
 
-	ARRAY_DEFINE(connections, struct mysql_connection);
+	MYSQL *mysql;
 	unsigned int next_query_connection;
-};
-
-struct mysql_connection {
-	struct mysql_db *db;
 
-	MYSQL *mysql;
-	const char *host;
-
-	unsigned int connect_delay;
-	unsigned int connect_failure_count;
-
-	time_t last_connect;
-	unsigned int connected:1;
 	unsigned int ssl_set:1;
 };
 
 struct mysql_result {
 	struct sql_result api;
-	struct mysql_connection *conn;
 
 	MYSQL_RES *result;
         MYSQL_ROW row;
@@ -67,137 +41,83 @@
 	struct sql_transaction_context ctx;
 
 	pool_t query_pool;
-	struct mysql_query_list *head, *tail;
-
 	const char *error;
 
 	unsigned int failed:1;
 };
 
-struct mysql_query_list {
-	struct mysql_query_list *next;
-	const char *query;
-	unsigned int *affected_rows;
-};
-
 extern const struct sql_db driver_mysql_db;
 extern const struct sql_result driver_mysql_result;
 extern const struct sql_result driver_mysql_error_result;
 
-static bool driver_mysql_connect(struct mysql_connection *conn)
+static int driver_mysql_connect(struct sql_db *_db)
 {
-	struct mysql_db *db = conn->db;
+	struct mysql_db *db = (struct mysql_db *)_db;
 	const char *unix_socket, *host;
 	unsigned long client_flags = db->client_flags;
-	time_t now;
 	bool failed;
 
-	if (conn->connected)
-		return TRUE;
+	i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
 
-	/* don't try reconnecting more than once a second */
-	now = time(NULL);
-	if (conn->last_connect + (time_t)conn->connect_delay > now)
-		return FALSE;
-	conn->last_connect = now;
-
-	if (*conn->host == '/') {
-		unix_socket = conn->host;
+	if (*db->host == '/') {
+		unix_socket = db->host;
 		host = NULL;
 	} else {
 		unix_socket = NULL;
-		host = conn->host;
+		host = db->host;
 	}
 
 	if (db->option_file != NULL) {
-		mysql_options(conn->mysql, MYSQL_READ_DEFAULT_FILE,
+		mysql_options(db->mysql, MYSQL_READ_DEFAULT_FILE,
 			      db->option_file);
 	}
 
-	mysql_options(conn->mysql, MYSQL_READ_DEFAULT_GROUP,
+	mysql_options(db->mysql, MYSQL_READ_DEFAULT_GROUP,
 		      db->option_group != NULL ? db->option_group : "client");
 
-	if (!conn->ssl_set && (db->ssl_ca != NULL || db->ssl_ca_path != NULL)) {
+	if (!db->ssl_set && (db->ssl_ca != NULL || db->ssl_ca_path != NULL)) {
 #ifdef HAVE_MYSQL_SSL
-		mysql_ssl_set(conn->mysql, db->ssl_key, db->ssl_cert,
+		mysql_ssl_set(db->mysql, db->ssl_key, db->ssl_cert,
 			      db->ssl_ca, db->ssl_ca_path
 #ifdef HAVE_MYSQL_SSL_CIPHER
 			      , db->ssl_cipher
 #endif
 			     );
-		conn->ssl_set = TRUE;
+		db->ssl_set = TRUE;
 #else
 		i_fatal("mysql: SSL support not compiled in "
 			"(remove ssl_ca and ssl_ca_path settings)");
 #endif
 	}
 
-	alarm(MYSQL_CONNECT_FAILURE_TIMEOUT);
+	alarm(SQL_CONNECT_TIMEOUT_SECS);
 #ifdef CLIENT_MULTI_RESULTS
 	client_flags |= CLIENT_MULTI_RESULTS;
 #endif
 	/* CLIENT_MULTI_RESULTS allows the use of stored procedures */
-	failed = mysql_real_connect(conn->mysql, host, db->user, db->password,
+	failed = mysql_real_connect(db->mysql, host, db->user, db->password,
 				    db->dbname, db->port, unix_socket,
 				    client_flags) == NULL;
 	alarm(0);
 	if (failed) {
-		if (conn->connect_failure_count > 0) {
-			/* increase delay between reconnections to this
-			   server */
-			conn->connect_delay *= 5;
-			if (conn->connect_delay > CONNECT_MAX_DELAY)
-				conn->connect_delay = CONNECT_MAX_DELAY;
-		}
-		conn->connect_failure_count++;
-
+		sql_db_set_state(&db->api, SQL_DB_STATE_DISCONNECTED);
 		i_error("mysql: Connect failed to %s (%s): %s - "
 			"waiting for %u seconds before retry",
 			host != NULL ? host : unix_socket, db->dbname,
-			mysql_error(conn->mysql), conn->connect_delay);
-		return FALSE;
+			mysql_error(db->mysql), db->api.connect_delay);
+		return -1;
 	} else {
 		i_info("mysql: Connected to %s%s (%s)",
 		       host != NULL ? host : unix_socket,
-		       conn->ssl_set ? " using SSL" : "", db->dbname);
+		       db->ssl_set ? " using SSL" : "", db->dbname);
 
-		conn->connect_failure_count = 0;
-		conn->connect_delay = CONNECT_MIN_DELAY;
-		conn->connected = TRUE;
-		return TRUE;
+		sql_db_set_state(&db->api, SQL_DB_STATE_IDLE);
+		return 1;
 	}
 }
 
-static int driver_mysql_connect_all(struct sql_db *_db)
-{
-	struct mysql_db *db = (struct mysql_db *)_db;
-	struct mysql_connection *conn;
-	int ret = -1;
-
-	array_foreach_modifiable(&db->connections, conn) {
-		if (driver_mysql_connect(conn))
-			ret = 1;
-	}
-	return ret;
-}
-
-static void driver_mysql_connection_add(struct mysql_db *db, const char *host)
+static void driver_mysql_disconnect(struct sql_db *_db ATTR_UNUSED)
 {
-	struct mysql_connection *conn;
-
-	conn = array_append_space(&db->connections);
-	conn->db = db;
-	conn->host = p_strdup(db->pool, host);
-	conn->mysql = mysql_init(NULL);
-	if (conn->mysql == NULL)
-		i_fatal("mysql_init() failed");
-
-	conn->connect_delay = CONNECT_MIN_DELAY;
-}
-
-static void driver_mysql_connection_free(struct mysql_connection *conn)
-{
-	mysql_close(conn->mysql);
 }
 
 static void driver_mysql_parse_connect_string(struct mysql_db *db,
@@ -221,7 +141,7 @@
 		field = NULL;
 		if (strcmp(name, "host") == 0 ||
 		    strcmp(name, "hostaddr") == 0)
-			driver_mysql_connection_add(db, value);
+			field = &db->host;
 		else if (strcmp(name, "user") == 0)
 			field = &db->user;
 		else if (strcmp(name, "password") == 0)
@@ -253,8 +173,12 @@
 			*field = p_strdup(db->pool, value);
 	}
 
-	if (array_count(&db->connections) == 0)
+	if (db->host == NULL)
 		i_fatal("mysql: No hosts given in connect string");
+
+	db->mysql = mysql_init(NULL);
+	if (db->mysql == NULL)
+		i_fatal("mysql_init() failed");
 }
 
 static struct sql_db *driver_mysql_init_v(const char *connect_string)
@@ -266,7 +190,6 @@
 	db = p_new(pool, struct mysql_db, 1);
 	db->pool = pool;
 	db->api = driver_mysql_db;
-	p_array_init(&db->connections, db->pool, 6);
 
 	T_BEGIN {
 		driver_mysql_parse_connect_string(db, connect_string);
@@ -277,88 +200,26 @@
 static void driver_mysql_deinit_v(struct sql_db *_db)
 {
 	struct mysql_db *db = (struct mysql_db *)_db;
-	struct mysql_connection *conn;
 
-	array_foreach_modifiable(&db->connections, conn)
-		(void)driver_mysql_connection_free(conn);
-
+	mysql_close(db->mysql);
 	array_free(&_db->module_contexts);
 	pool_unref(&db->pool);
 }
 
-static enum sql_db_flags
-driver_mysql_get_flags(struct sql_db *db ATTR_UNUSED)
-{
-	return SQL_DB_FLAG_BLOCKING;
-}
-
-static int driver_mysql_connection_do_query(struct mysql_connection *conn,
-					    const char *query)
-{
-	int i;
-
-	for (i = 0; i < 2; i++) {
-		if (!driver_mysql_connect(conn))
-			return 0;
-
-		if (mysql_query(conn->mysql, query) == 0)
-			return 1;
-
-		/* failed */
-		switch (mysql_errno(conn->mysql)) {
-		case CR_SERVER_GONE_ERROR:
-		case CR_SERVER_LOST:
-			/* connection lost - try immediate reconnect */
-			conn->connected = FALSE;
-			break;
-		default:
-			return -1;
-		}
-	}
-
-	/* connected -> lost it -> connected -> lost again */
-	return 0;
-}
-
-static int driver_mysql_do_query(struct mysql_db *db, const char *query,
-				 struct mysql_connection **conn_r)
+static int driver_mysql_do_query(struct mysql_db *db, const char *query)
 {
-	struct mysql_connection *conn;
-	unsigned int i, start, count;
-	bool reset;
-	int ret;
-
-	conn = array_get_modifiable(&db->connections, &count);
-
-	/* go through the connections in round robin. if the connection
-	   isn't available, try next one that is. */
-	start = db->next_query_connection % count;
-	db->next_query_connection++;
+	if (mysql_query(db->mysql, query) == 0)
+		return 1;
 
-	for (reset = FALSE;; reset = TRUE) {
-		i = start;
-		do {
-			ret = driver_mysql_connection_do_query(&conn[i], query);
-			if (ret != 0) {
-				/* success / failure */
-				*conn_r = &conn[i];
-				return ret;
-			}
-
-			/* not connected, try next one */
-			i = (i + 1) % count;
-		} while (i != start);
-
-		if (reset)
-			break;
-
-		/* none are connected. connect_delays may have gotten too high,
-		   reset all of them to see if some are still alive. */
-		for (i = 0; i < count; i++)
-			conn[i].connect_delay = CONNECT_RESET_DELAY;
+	/* failed */
+	switch (mysql_errno(db->mysql)) {
+	case CR_SERVER_GONE_ERROR:
+	case CR_SERVER_LOST:
+		sql_db_set_state(&db->api, SQL_DB_STATE_DISCONNECTED);
+		break;
+	default:
+		return -1;
 	}
-
-	*conn_r = NULL;
 	return 0;
 }
 
@@ -366,40 +227,29 @@
 driver_mysql_escape_string(struct sql_db *_db, const char *string)
 {
 	struct mysql_db *db = (struct mysql_db *)_db;
-	struct mysql_connection *conn;
-	unsigned int i, count;
 	size_t len = strlen(string);
 	char *to;
 
-	/* All the connections should be identical, so just use the first
-	   connected one */
-	conn = array_get_modifiable(&db->connections, &count);
-	for (i = 0; i < count; i++) {
-		if (conn[i].connected)
-			break;
+	if (_db->state == SQL_DB_STATE_DISCONNECTED) {
+		/* try connecting */
+		(void)sql_connect(&db->api);
 	}
-	if (i == count) {
-		/* so, try connecting.. */
-		for (i = 0; i < count; i++) {
-			if (driver_mysql_connect(&conn[i]))
-				break;
-		}
-		if (i == count) {
-			/* FIXME: we don't have a valid connection, so fallback
-			   to using default escaping. the next query will most
-			   likely fail anyway so it shouldn't matter that much
-			   what we return here.. Anyway, this API needs
-			   changing so that the escaping function could already
-			   fail the query reliably. */
-			to = t_buffer_get(len * 2 + 1);
-			len = mysql_escape_string(to, string, len);
-			t_buffer_alloc(len + 1);
-			return to;
-		}
+
+	if (db->mysql == NULL) {
+		/* FIXME: we don't have a valid connection, so fallback
+		   to using default escaping. the next query will most
+		   likely fail anyway so it shouldn't matter that much
+		   what we return here.. Anyway, this API needs
+		   changing so that the escaping function could already
+		   fail the query reliably. */
+		to = t_buffer_get(len * 2 + 1);
+		len = mysql_escape_string(to, string, len);
+		t_buffer_alloc(len + 1);
+		return to;
 	}
 
 	to = t_buffer_get(len * 2 + 1);
-	len = mysql_real_escape_string(conn[i].mysql, to, string, len);
+	len = mysql_real_escape_string(db->mysql, to, string, len);
 	t_buffer_alloc(len + 1);
 	return to;
 }
@@ -407,9 +257,8 @@
 static void driver_mysql_exec(struct sql_db *_db, const char *query)
 {
 	struct mysql_db *db = (struct mysql_db *)_db;
-	struct mysql_connection *conn;
 
-	(void)driver_mysql_do_query(db, query, &conn);
+	(void)driver_mysql_do_query(db, query);
 }
 
 static void driver_mysql_query(struct sql_db *db, const char *query,
@@ -428,7 +277,6 @@
 driver_mysql_query_s(struct sql_db *_db, const char *query)
 {
 	struct mysql_db *db = (struct mysql_db *)_db;
-	struct mysql_connection *conn;
 	struct mysql_result *result;
 	int ret;
 
@@ -436,25 +284,25 @@
 	result->api = driver_mysql_result;
 	result->api.db = _db;
 
-	switch (driver_mysql_do_query(db, query, &conn)) {
+	switch (driver_mysql_do_query(db, query)) {
 	case 0:
 		/* not connected */
 		result->api = sql_not_connected_result;
 		break;
 	case 1:
 		/* query ok */
-		result->result = mysql_store_result(conn->mysql);
+		result->result = mysql_store_result(db->mysql);
 #ifdef CLIENT_MULTI_RESULTS
 		/* Because we've enabled CLIENT_MULTI_RESULTS, we need to read
 		   (ignore) extra results - there should not be any.
 		   ret is: -1 = done, >0 = error, 0 = more results. */
-		while ((ret = mysql_next_result(conn->mysql)) == 0) ;
+		while ((ret = mysql_next_result(db->mysql)) == 0) ;
 #else
 		ret = -1;
 #endif
 
 		if (ret < 0 &&
-		    (result->result != NULL || mysql_errno(conn->mysql) == 0))
+		    (result->result != NULL || mysql_errno(db->mysql) == 0))
 			break;
 
 		/* failed */
@@ -468,7 +316,6 @@
 	}
 
 	result->api.refcount = 1;
-	result->conn = conn;
 	return &result->api;
 }
 
@@ -488,6 +335,7 @@
 static int driver_mysql_result_next_row(struct sql_result *_result)
 {
 	struct mysql_result *result = (struct mysql_result *)_result;
+	struct mysql_db *db = (struct mysql_db *)_result->db;
 
 	if (result->result == NULL) {
 		/* no results */
@@ -498,7 +346,7 @@
 	if (result->row != NULL)
 		return 1;
 
-	return mysql_errno(result->conn->mysql) != 0 ? -1 : 0;
+	return mysql_errno(db->mysql) != 0 ? -1 : 0;
 }
 
 static void driver_mysql_result_fetch_fields(struct mysql_result *result)
@@ -583,9 +431,9 @@
 
 static const char *driver_mysql_result_get_error(struct sql_result *_result)
 {
-	struct mysql_result *result = (struct mysql_result *)_result;
+	struct mysql_db *db = (struct mysql_db *)_result->db;
 
-	return mysql_error(result->conn->mysql);
+	return mysql_error(db->mysql);
 }
 
 static struct sql_transaction_context *
@@ -626,12 +474,13 @@
 		ctx->error = sql_result_get_error(result);
 		ctx->failed = TRUE;
 		ret = -1;
-	} else if (ctx->head != NULL && ctx->head->affected_rows != NULL) {
-		struct mysql_result *my_result = (struct mysql_result *)result;
+	} else if (ctx->ctx.head != NULL &&
+		   ctx->ctx.head->affected_rows != NULL) {
+		struct mysql_db *db = (struct mysql_db *)result->db;
 
-		rows = mysql_affected_rows(my_result->conn->mysql);
+		rows = mysql_affected_rows(db->mysql);
 		i_assert(rows != (my_ulonglong)-1);
-		*ctx->head->affected_rows = rows;
+		*ctx->ctx.head->affected_rows = rows;
 	}
 	sql_result_unref(result);
 	return ret;
@@ -647,14 +496,14 @@
 
 	*error_r = NULL;
 
-	if (ctx->head != NULL) {
+	if (_ctx->head != NULL) {
 		/* try to use a transaction in any case,
 		   even if it doesn't work. */
 		(void)transaction_send_query(ctx, "BEGIN");
-		while (ctx->head != NULL) {
-			if (transaction_send_query(ctx, ctx->head->query) < 0)
+		while (_ctx->head != NULL) {
+			if (transaction_send_query(ctx, _ctx->head->query) < 0)
 				break;
-			ctx->head = ctx->head->next;
+			_ctx->head = _ctx->head->next;
 		}
 		ret = transaction_send_query(ctx, "COMMIT");
 		*error_r = ctx->error;
@@ -679,27 +528,20 @@
 {
 	struct mysql_transaction_context *ctx =
 		(struct mysql_transaction_context *)_ctx;
-	struct mysql_query_list *list;
 
-	list = p_new(ctx->query_pool, struct mysql_query_list, 1);
-	list->query = p_strdup(ctx->query_pool, query);
-	list->affected_rows = affected_rows;
-
-	if (ctx->head == NULL)
-		ctx->head = list;
-	else
-		ctx->tail->next = list;
-	ctx->tail = list;
+	sql_transaction_add_query(&ctx->ctx, ctx->query_pool,
+				  query, affected_rows);
 }
 
 const struct sql_db driver_mysql_db = {
-	"mysql",
+	.name = "mysql",
+	.flags = SQL_DB_FLAG_BLOCKING | SQL_DB_FLAG_POOLED,
 
 	.v = {
 		driver_mysql_init_v,
 		driver_mysql_deinit_v,
-		driver_mysql_get_flags,
-		driver_mysql_connect_all,
+		driver_mysql_connect,
+		driver_mysql_disconnect,
 		driver_mysql_escape_string,
 		driver_mysql_exec,
 		driver_mysql_query,
@@ -741,7 +583,8 @@
 		driver_mysql_result_error_next_row,
 		NULL, NULL, NULL, NULL, NULL, NULL, NULL,
 		driver_mysql_result_get_error
-	}
+	},
+	.failed_try_retry = TRUE
 };
 
 void driver_mysql_init(void);
--- a/src/lib-sql/driver-pgsql.c	Tue May 04 16:12:00 2010 +0300
+++ b/src/lib-sql/driver-pgsql.c	Tue May 04 17:55:23 2010 +0300
@@ -3,16 +3,12 @@
 #include "lib.h"
 #include "array.h"
 #include "ioloop.h"
-#include "ioloop-internal.h" /* kind of dirty, but it should be fine.. */
 #include "sql-api-private.h"
 
 #ifdef BUILD_PGSQL
 #include <stdlib.h>
-#include <time.h>
 #include <libpq-fe.h>
 
-#define QUERY_TIMEOUT_SECS 6
-
 struct pgsql_db {
 	struct sql_db api;
 
@@ -21,20 +17,16 @@
 	PGconn *pg;
 
 	struct io *io;
+	struct timeout *to_connect;
 	enum io_condition io_dir;
 
-	struct pgsql_queue *queue, **queue_tail;
-	struct timeout *queue_to;
-
+	struct pgsql_result *cur_result;
 	struct ioloop *ioloop;
 	struct sql_result *sync_result;
 
 	char *error;
-	time_t last_connect;
-	unsigned int connecting:1;
-	unsigned int connected:1;
-	unsigned int querying:1;
-	unsigned int query_finished:1;
+
+	unsigned int fatal_error:1;
 };
 
 struct pgsql_binary_value {
@@ -45,8 +37,8 @@
 struct pgsql_result {
 	struct sql_result api;
 	PGresult *pgres;
+	struct timeout *to;
 
-	char *query;
 	unsigned int rownum, rows;
 	unsigned int fields_count;
 	const char **fields;
@@ -57,15 +49,7 @@
 	sql_query_callback_t *callback;
 	void *context;
 
-	unsigned int retry_query:1;
-};
-
-struct pgsql_queue {
-	struct pgsql_queue *next;
-
-	time_t created;
-	char *query;
-	struct pgsql_result *result;
+	unsigned int timeout:1;
 };
 
 struct pgsql_transaction_context {
@@ -76,7 +60,6 @@
 	void *context;
 
 	pool_t query_pool;
-	struct pgsql_query_list *head, *tail;
 	const char *error;
 
 	unsigned int begin_succeeded:1;
@@ -84,26 +67,15 @@
 	unsigned int failed:1;
 };
 
-struct pgsql_query_list {
-	struct pgsql_query_list *next;
-	struct pgsql_transaction_context *ctx;
-
-	const char *query;
-	unsigned int *affected_rows;
-};
 extern const struct sql_db driver_pgsql_db;
 extern const struct sql_result driver_pgsql_result;
 
-static void
-driver_pgsql_query_full(struct sql_db *db, const char *query,
-			sql_query_callback_t *callback, void *context,
-			bool retry_query);
-static void queue_send_next(struct pgsql_db *db);
 static void result_finish(struct pgsql_result *result);
 
 static void driver_pgsql_close(struct pgsql_db *db)
 {
 	db->io_dir = 0;
+	db->fatal_error = FALSE;
 
 	PQfinish(db->pg);
 	db->pg = NULL;
@@ -113,10 +85,15 @@
 		   so use io_remove_closed(). */
 		io_remove_closed(&db->io);
 	}
+	if (db->to_connect != NULL)
+		timeout_remove(&db->to_connect);
 
-	db->connecting = FALSE;
-	db->connected = FALSE;
-        db->querying = FALSE;
+	sql_db_set_state(&db->api, SQL_DB_STATE_DISCONNECTED);
+
+	if (db->ioloop != NULL) {
+		/* running a sync query, stop it */
+		io_loop_stop(db->ioloop);
+	}
 }
 
 static const char *last_error(struct pgsql_db *db)
@@ -150,9 +127,6 @@
 		io_dir = IO_WRITE;
 		break;
 	case PGRES_POLLING_OK:
-		i_info("pgsql: Connected to %s", PQdb(db->pg));
-		db->connecting = FALSE;
-		db->connected = TRUE;
 		break;
 	case PGRES_POLLING_FAILED:
 		i_error("pgsql: Connect failed to %s: %s",
@@ -169,20 +143,33 @@
 		db->io_dir = io_dir;
 	}
 
-	if (db->connected && db->queue != NULL)
-		queue_send_next(db);
+	if (io_dir == 0) {
+		i_info("pgsql: Connected to %s", PQdb(db->pg));
+		if (db->to_connect != NULL)
+			timeout_remove(&db->to_connect);
+		sql_db_set_state(&db->api, SQL_DB_STATE_IDLE);
+		if (db->ioloop != NULL) {
+			/* driver_pgsql_sync_init() waiting for connection to
+			   finish */
+			io_loop_stop(db->ioloop);
+		}
+	}
+}
+
+static void driver_pgsql_connect_timeout(struct pgsql_db *db)
+{
+	unsigned int secs = ioloop_time - db->api.last_connect_try;
+
+	i_error("pgsql: Connect failed to %s: Timeout after %u seconds",
+		PQdb(db->pg), secs);
+	driver_pgsql_close(db);
 }
 
 static int driver_pgsql_connect(struct sql_db *_db)
 {
 	struct pgsql_db *db = (struct pgsql_db *)_db;
-	time_t now;
 
-	/* don't try reconnecting more than once a second */
-	now = time(NULL);
-	if (db->connecting || db->last_connect == now)
-		return db->connected ? 1 : (db->connecting ? 0 : -1);
-	db->last_connect = now;
+	i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
 
 	db->pg = PQconnectStart(db->connect_string);
 	if (db->pg == NULL)
@@ -193,28 +180,33 @@
 			PQdb(db->pg), last_error(db));
 		driver_pgsql_close(db);
 		return -1;
-	} else {
-		/* nonblocking connecting begins. */
-		if (PQsetnonblocking(db->pg, 1) < 0)
-			i_error("pgsql: PQsetnonblocking() failed");
-		db->io = io_add(PQsocket(db->pg), IO_WRITE,
-				connect_callback, db);
-		db->io_dir = IO_WRITE;
-		db->connecting = TRUE;
-		return 0;
 	}
+
+	/* nonblocking connecting begins. */
+	if (PQsetnonblocking(db->pg, 1) < 0)
+		i_error("pgsql: PQsetnonblocking() failed");
+	db->to_connect = timeout_add(SQL_CONNECT_TIMEOUT_SECS * 1000,
+				     driver_pgsql_connect_timeout, db);
+	db->io = io_add(PQsocket(db->pg), IO_WRITE, connect_callback, db);
+	db->io_dir = IO_WRITE;
+	sql_db_set_state(&db->api, SQL_DB_STATE_CONNECTING);
+	return 0;
+}
+
+static void driver_pgsql_disconnect(struct sql_db *_db)
+{
+	struct pgsql_db *db = (struct pgsql_db *)_db;
+
+	driver_pgsql_close(db);
 }
 
 static struct sql_db *driver_pgsql_init_v(const char *connect_string)
 {
 	struct pgsql_db *db;
 
-	i_assert(connect_string != NULL);
-
 	db = i_new(struct pgsql_db, 1);
 	db->connect_string = i_strdup(connect_string);
 	db->api = driver_pgsql_db;
-	db->queue_tail = &db->queue;
 	return &db->api;
 }
 
@@ -222,18 +214,9 @@
 {
 	struct pgsql_db *db = (struct pgsql_db *)_db;
 
-	while (db->queue != NULL) {
-		struct pgsql_queue *next = db->queue->next;
+	if (db->cur_result != NULL && db->cur_result->to != NULL)
+                result_finish(db->cur_result);
 
-                result_finish(db->queue->result);
-		i_free(db->queue->query);
-		i_free(db->queue);
-
-		db->queue = next;
-	}
-
-	if (db->queue_to != NULL)
-		timeout_remove(&db->queue_to);
         driver_pgsql_close(db);
 	i_free(db->error);
 	i_free(db->connect_string);
@@ -241,10 +224,14 @@
 	i_free(db);
 }
 
-static enum sql_db_flags
-driver_pgsql_get_flags(struct sql_db *db ATTR_UNUSED)
+static void driver_pgsql_set_idle(struct pgsql_db *db)
 {
-	return 0;
+	i_assert(db->api.state == SQL_DB_STATE_BUSY);
+
+	if (db->fatal_error)
+		driver_pgsql_close(db);
+	else
+		sql_db_set_state(&db->api, SQL_DB_STATE_IDLE);
 }
 
 static void consume_results(struct pgsql_db *db)
@@ -261,31 +248,42 @@
 		PQclear(pgres);
 	}
 
-	if (PQstatus(db->pg) == CONNECTION_BAD)
+	if (PQstatus(db->pg) == CONNECTION_BAD) {
 		io_remove_closed(&db->io);
-	else
+		driver_pgsql_close(db);
+	} else {
 		io_remove(&db->io);
-
-	db->querying = FALSE;
-	if (db->queue != NULL && db->connected)
-		queue_send_next(db);
+		driver_pgsql_set_idle(db);
+	}
 }
 
 static void driver_pgsql_result_free(struct sql_result *_result)
 {
 	struct pgsql_db *db = (struct pgsql_db *)_result->db;
         struct pgsql_result *result = (struct pgsql_result *)_result;
+	bool success;
 
-	if (result->api.callback)
+	if (result->api.callback) {
+		/* we're coming here from a user's sql_result_free() that's
+		   being called from a callback. we'll do this later,
+		   so ignore. */
 		return;
+	}
+
+	i_assert(db->cur_result == result);
+	i_assert(result->callback == NULL);
 
 	if (_result == db->sync_result)
 		db->sync_result = NULL;
+	db->cur_result = NULL;
 
+	success = result->pgres != NULL && !db->fatal_error;
 	if (result->pgres != NULL) {
 		PQclear(result->pgres);
 		result->pgres = NULL;
+	}
 
+	if (success) {
 		/* we'll have to read the rest of the results as well */
 		i_assert(db->io == NULL);
 		db->io = io_add(PQsocket(db->pg), IO_READ,
@@ -293,7 +291,7 @@
 		db->io_dir = IO_READ;
 		consume_results(db);
 	} else {
-		db->querying = FALSE;
+		driver_pgsql_set_idle(db);
 	}
 
 	if (array_is_created(&result->binary_values)) {
@@ -306,54 +304,38 @@
 
 	i_free(result->fields);
 	i_free(result->values);
-	i_free(result->query);
 	i_free(result);
-
-	if (db->queue != NULL && !db->querying && db->connected)
-		queue_send_next(db);
 }
 
 static void result_finish(struct pgsql_result *result)
 {
 	struct pgsql_db *db = (struct pgsql_db *)result->api.db;
-	bool free_result = TRUE, retry = FALSE;
-	bool disconnected;
+	bool free_result = TRUE;
+
+	timeout_remove(&result->to);
 
 	/* if connection to server was lost, we don't yet see that the
 	   connection is bad. we only see the fatal error, so assume it also
 	   means disconnection. */
-	disconnected = PQstatus(db->pg) == CONNECTION_BAD ||
-		PQresultStatus(result->pgres) == PGRES_FATAL_ERROR;
-	if (disconnected && result->retry_query) {
-		/* retry the query */
-		i_error("pgsql: Query failed, retrying: %s", last_error(db));
-		retry = TRUE;
-	} else if (result->callback != NULL) {
-		result->api.callback = TRUE;
-		T_BEGIN {
-			result->callback(&result->api, result->context);
-		} T_END;
-		result->api.callback = FALSE;
-		free_result = db->sync_result != &result->api;
-		if (db->queue == NULL && db->ioloop != NULL)
-			io_loop_stop(db->ioloop);
-	}                            
+	if (PQstatus(db->pg) == CONNECTION_BAD || result->pgres == NULL ||
+	    PQresultStatus(result->pgres) == PGRES_FATAL_ERROR)
+		db->fatal_error = TRUE;
 
-	if (disconnected) {
-		/* disconnected */
-		if (result->pgres != NULL) {
-			PQclear(result->pgres);
-			result->pgres = NULL;
-		}
-		driver_pgsql_close(db);
+	if (db->fatal_error) {
+		result->api.failed = TRUE;
+		result->api.failed_try_retry = TRUE;
+	}
+	result->api.callback = TRUE;
+	T_BEGIN {
+		result->callback(&result->api, result->context);
+	} T_END;
+	result->api.callback = FALSE;
+	result->callback = NULL;
 
-		if (retry) {
-			/* retry the query */
-			driver_pgsql_query_full(&db->api, result->query,
-						result->callback,
-						result->context, FALSE);
-		}
-	}
+	free_result = db->sync_result != &result->api;
+	if (db->ioloop != NULL)
+		io_loop_stop(db->ioloop);
+
 	if (free_result)
 		sql_result_unref(&result->api);
 }
@@ -363,7 +345,6 @@
         struct pgsql_db *db = (struct pgsql_db *)result->api.db;
 
 	if (!PQconsumeInput(db->pg)) {
-		db->connected = FALSE;
 		result_finish(result);
 		return;
 	}
@@ -396,7 +377,6 @@
 	io_remove(&db->io);
 
 	if (ret < 0) {
-		db->connected = FALSE;
 		result_finish(result);
 	} else {
 		/* all flushed */
@@ -404,29 +384,34 @@
 	}
 }
 
-static void send_query(struct pgsql_result *result, const char *query)
+static void query_timeout(struct pgsql_result *result)
+{
+	i_error("pgsql: Query timed out, aborting");
+	result->timeout = TRUE;
+	result_finish(result);
+}
+
+static void do_query(struct pgsql_result *result, const char *query)
 {
         struct pgsql_db *db = (struct pgsql_db *)result->api.db;
 	int ret;
 
+	i_assert(SQL_DB_IS_READY(&db->api));
+	i_assert(db->cur_result == NULL);
 	i_assert(db->io == NULL);
-	i_assert(!db->querying);
-	i_assert(db->connected);
 
-	if (!PQsendQuery(db->pg, query)) {
-		db->connected = FALSE;
+	db->cur_result = result;
+	result->to = timeout_add(SQL_QUERY_TIMEOUT_SECS * 1000,
+				 query_timeout, result);
+
+	if (!PQsendQuery(db->pg, query) ||
+	    (ret = PQflush(db->pg)) < 0) {
+		/* failed to send query */
 		result_finish(result);
 		return;
 	}
 
-	ret = PQflush(db->pg);
-	if (ret < 0) {
-		db->connected = FALSE;
-		result_finish(result);
-		return;
-	}
-
-	db->querying = TRUE;
+	sql_db_set_state(&db->api, SQL_DB_STATE_BUSY);
 	if (ret > 0) {
 		/* write blocks */
 		db->io = io_add(PQsocket(db->pg), IO_WRITE,
@@ -437,115 +422,29 @@
 	}
 }
 
-static struct pgsql_queue *queue_unlink_first(struct pgsql_db *db)
-{
-	struct pgsql_queue *queue;
-
-	queue = db->queue;
-	db->queue = queue->next;
-
-	if (db->queue == NULL)
-		db->queue_tail = &db->queue;
-	return queue;
-}
-
-static void queue_send_next(struct pgsql_db *db)
-{
-	struct pgsql_queue *queue;
-
-	queue = queue_unlink_first(db);
-	send_query(queue->result, queue->query);
-
-	i_free(queue->query);
-	i_free(queue);
-}
-
-static void queue_abort_next(struct pgsql_db *db)
+static const char *
+driver_pgsql_escape_string(struct sql_db *_db, const char *string)
 {
-	struct pgsql_queue *queue;
-
-	queue = queue_unlink_first(db);
-
-	queue->result->callback(&sql_not_connected_result,
-				queue->result->context);
-	i_free(queue->result);
-	i_free(queue->query);
-	i_free(queue);
-
-	if (db->queue == NULL && db->ioloop != NULL)
-		io_loop_stop(db->ioloop);
-}
-
-static void queue_drop_timed_out_queries(struct pgsql_db *db)
-{
-	while (db->queue != NULL &&
-	       db->queue->created + QUERY_TIMEOUT_SECS < ioloop_time)
-		queue_abort_next(db);
-
-}
-
-static void queue_timeout(struct pgsql_db *db)
-{
-	if (db->querying)
-		return;
+	struct pgsql_db *db = (struct pgsql_db *)_db;
+	size_t len = strlen(string);
+	char *to;
 
-	if (!db->connected) {
-		queue_drop_timed_out_queries(db);
-		driver_pgsql_connect(&db->api);
-		return;
+#ifdef HAVE_PQESCAPE_STRING_CONN
+	if (db->api.state == SQL_DB_STATE_DISCONNECTED) {
+		/* try connecting again */
+		(void)sql_connect(&db->api);
 	}
-
-	if (db->queue != NULL)
-		queue_send_next(db);
-
-	if (db->queue == NULL)
-		timeout_remove(&db->queue_to);
-}
-
-static void
-driver_pgsql_queue_query(struct pgsql_result *result, const char *query)
-{
-        struct pgsql_db *db = (struct pgsql_db *)result->api.db;
-	struct pgsql_queue *queue;
-
-	queue = i_new(struct pgsql_queue, 1);
-	queue->created = time(NULL);
-	queue->query = i_strdup(query);
-	queue->result = result;
-
-	*db->queue_tail = queue;
-	db->queue_tail = &queue->next;
-
-	if (db->queue_to == NULL)
-		db->queue_to = timeout_add(5000, queue_timeout, db);
-}
-
-static void do_query(struct pgsql_result *result, const char *query)
-{
-        struct pgsql_db *db = (struct pgsql_db *)result->api.db;
-
-	i_assert(db->sync_result == NULL);
-
-	if (db->querying) {
-		/* only one query at a time */
-		driver_pgsql_queue_query(result, query);
-		return;
+	if (db->api.state != SQL_DB_STATE_DISCONNECTED) {
+		to = t_buffer_get(len * 2 + 1);
+		len = PQescapeStringConn(db->pg, to, string, len, NULL);
+	} else
+#endif
+	{
+		to = t_buffer_get(len * 2 + 1);
+		len = PQescapeString(to, string, len);
 	}
-
-	if (!db->connected) {
-		/* try connecting again */
-		driver_pgsql_connect(&db->api);
-		driver_pgsql_queue_query(result, query);
-		return;
-	}
-
-	if (db->queue == NULL)
-		send_query(result, query);
-	else {
-		/* there's already queries queued, send them first */
-		driver_pgsql_queue_query(result, query);
-		queue_send_next(db);
-	}
+	t_buffer_alloc(len + 1);
+	return to;
 }
 
 static void exec_callback(struct sql_result *_result,
@@ -556,30 +455,7 @@
 	i_error("pgsql: sql_exec() failed: %s", last_error(db));
 }
 
-static const char *
-driver_pgsql_escape_string(struct sql_db *_db, const char *string)
-{
-	struct pgsql_db *db = (struct pgsql_db *)_db;
-	size_t len = strlen(string);
-	char *to;
-
-#ifdef HAVE_PQESCAPE_STRING_CONN
-	if (!db->connected) {
-		/* try connecting again */
-		(void)driver_pgsql_connect(&db->api);
-	}
-	to = t_buffer_get(len * 2 + 1);
-	len = PQescapeStringConn(db->pg, to, string, len, NULL);
-#else
-	to = t_buffer_get(len * 2 + 1);
-	len = PQescapeString(to, string, len);
-#endif
-	t_buffer_alloc(len + 1);
-	return to;
-}
-
-static void driver_pgsql_exec_full(struct sql_db *db, const char *query,
-				   bool retry_query)
+static void driver_pgsql_exec(struct sql_db *db, const char *query)
 {
 	struct pgsql_result *result;
 
@@ -588,22 +464,11 @@
 	result->api.db = db;
 	result->api.refcount = 1;
 	result->callback = exec_callback;
-	if (retry_query) {
-		result->query = i_strdup(query);
-		result->retry_query = TRUE;
-	}
 	do_query(result, query);
 }
 
-static void driver_pgsql_exec(struct sql_db *db, const char *query)
-{
-	driver_pgsql_exec_full(db, query, TRUE);
-}
-
-static void
-driver_pgsql_query_full(struct sql_db *db, const char *query,
-			sql_query_callback_t *callback, void *context,
-			bool retry_query)
+static void driver_pgsql_query(struct sql_db *db, const char *query,
+			       sql_query_callback_t *callback, void *context)
 {
 	struct pgsql_result *result;
 
@@ -613,74 +478,65 @@
 	result->api.refcount = 1;
 	result->callback = callback;
 	result->context = context;
-	if (retry_query) {
-		result->query = i_strdup(query);
-		result->retry_query = TRUE;
-	}
 	do_query(result, query);
 }
 
-static void driver_pgsql_query(struct sql_db *db, const char *query,
-			       sql_query_callback_t *callback, void *context)
-{
-	driver_pgsql_query_full(db, query, callback, context, TRUE);
-}
-
 static void pgsql_query_s_callback(struct sql_result *result, void *context)
 {
         struct pgsql_db *db = context;
 
-	db->query_finished = TRUE;
 	db->sync_result = result;
 }
 
+static void driver_pgsql_sync_init(struct pgsql_db *db)
+{
+	if (db->io == NULL) {
+		db->ioloop = io_loop_create();
+		return;
+	}
+
+	i_assert(db->api.state == SQL_DB_STATE_CONNECTING);
+
+	/* have to move our existing I/O handler to new I/O loop */
+	io_remove(&db->io);
+
+	db->ioloop = io_loop_create();
+	db->io = io_add(PQsocket(db->pg), db->io_dir, connect_callback, db);
+	/* wait for connecting to finish */
+	io_loop_run(db->ioloop);
+}
+
+static void driver_pgsql_sync_deinit(struct pgsql_db *db)
+{
+	io_loop_destroy(&db->ioloop);
+}
+
 static struct sql_result *
-driver_pgsql_query_s(struct sql_db *_db, const char *query)
+driver_pgsql_sync_query(struct pgsql_db *db, const char *query)
 {
-        struct pgsql_db *db = (struct pgsql_db *)_db;
 	struct sql_result *result;
-	struct io old_io;
+
+	i_assert(db->sync_result == NULL);
 
-	if (db->queue_to != NULL) {
-		/* we're creating a new ioloop, make sure the timeout gets
-		   added there. */
-		timeout_remove(&db->queue_to);
+	switch (db->api.state) {
+	case SQL_DB_STATE_CONNECTING:
+	case SQL_DB_STATE_BUSY:
+		i_unreached();
+	case SQL_DB_STATE_DISCONNECTED:
+		sql_not_connected_result.refcount++;
+		return &sql_not_connected_result;
+	case SQL_DB_STATE_IDLE:
+		break;
 	}
 
-	if (db->io == NULL)
-		db->ioloop = io_loop_create();
-	else {
-		/* have to move our existing I/O handler to new I/O loop */
-		old_io = *db->io;
-		io_remove(&db->io);
-
-		db->ioloop = io_loop_create();
-
-		db->io = io_add(PQsocket(db->pg), old_io.condition,
-				old_io.callback, old_io.context);
-	}
-
-	db->query_finished = FALSE;
-	if (query != NULL)
-		driver_pgsql_query(_db, query, pgsql_query_s_callback, db);
+	driver_pgsql_query(&db->api, query, pgsql_query_s_callback, db);
+	if (db->sync_result == NULL)
+		io_loop_run(db->ioloop);
 
-	if (!db->query_finished) {
-		if ((db->connected || db->connecting) && db->io != NULL)
-			io_loop_run(db->ioloop);
-		else
-			queue_abort_next(db);
-
-		if (db->io != NULL) {
-			i_assert(db->sync_result == &sql_not_connected_result);
-			io_remove(&db->io);
-		}
-		if (db->queue_to != NULL)
-			timeout_remove(&db->queue_to);
-	} else {
-		i_assert(db->io == NULL);
-		i_assert(db->queue_to == NULL);
+	if (db->io != NULL) {
+		i_assert(db->fatal_error);
+		io_remove_closed(&db->io);
 	}
-	io_loop_destroy(&db->ioloop);
 
 	result = db->sync_result;
 	if (result == &sql_not_connected_result) {
@@ -693,6 +549,18 @@
 	return result;
 }
 
+static struct sql_result *
+driver_pgsql_query_s(struct sql_db *_db, const char *query)
+{
+	struct pgsql_db *db = (struct pgsql_db *)_db;
+	struct sql_result *result;
+
+	driver_pgsql_sync_init(db);
+	result = driver_pgsql_sync_query(db, query);
+	driver_pgsql_sync_deinit(db);
+	return result;
+}
+
 static int driver_pgsql_result_next_row(struct sql_result *_result)
 {
 	struct pgsql_result *result = (struct pgsql_result *)_result;
@@ -712,8 +580,10 @@
 			return 0;
 	}
 
-	if (result->pgres == NULL)
+	if (result->pgres == NULL) {
+		_result->failed = TRUE;
 		return -1;
+	}
 
 	switch (PQresultStatus(result->pgres)) {
 	case PGRES_COMMAND_OK:
@@ -725,10 +595,12 @@
 	case PGRES_EMPTY_QUERY:
 	case PGRES_NONFATAL_ERROR:
 		/* nonfatal error */
+		_result->failed = TRUE;
 		return -1;
 	default:
 		/* treat as fatal error */
-		db->connected = FALSE;
+		_result->failed = TRUE;
+		db->fatal_error = TRUE;
 		return -1;
 	}
 }
@@ -862,7 +734,9 @@
 
 	i_free_and_null(db->error);
 
-	if (result->pgres == NULL) {
+	if (result->timeout) {
+		db->error = i_strdup("Query timed out");
+	} else if (result->pgres == NULL) {
 		/* connection error */
 		db->error = i_strdup(last_error(db));
 	} else {
@@ -905,7 +779,7 @@
 
 static void
 transaction_begin_callback(struct sql_result *result,
-			    struct pgsql_transaction_context *ctx)
+			   struct pgsql_transaction_context *ctx)
 {
 	if (sql_result_next_row(result) < 0) {
 		ctx->begin_failed = TRUE;
@@ -930,17 +804,18 @@
 
 static void
 transaction_update_callback(struct sql_result *result,
-			    struct pgsql_query_list *list)
+			    struct sql_transaction_query *query)
 {
-	struct pgsql_transaction_context *ctx = list->ctx;
+	struct pgsql_transaction_context *ctx =
+		(struct pgsql_transaction_context *)query->trans;
 
 	if (sql_result_next_row(result) < 0) {
 		ctx->failed = TRUE;
 		ctx->error = sql_result_get_error(result);
-	} else if (list->affected_rows != NULL) {
+	} else if (query->affected_rows != NULL) {
 		struct pgsql_result *pg_result = (struct pgsql_result *)result;
 
-		*list->affected_rows = atoi(PQcmdTuples(pg_result->pgres));
+		*query->affected_rows = atoi(PQcmdTuples(pg_result->pgres));
 	}
 	driver_pgsql_transaction_unref(ctx);
 }
@@ -955,78 +830,110 @@
 	ctx->callback = callback;
 	ctx->context = context;
 
-	if (ctx->failed || ctx->head == NULL) {
+	if (ctx->failed || _ctx->head == NULL) {
 		callback(ctx->failed ? ctx->error : NULL, context);
 		driver_pgsql_transaction_unref(ctx);
-	} else if (ctx->head->next == NULL) {
+	} else if (_ctx->head->next == NULL) {
 		/* just a single query, send it */
-		sql_query(_ctx->db, ctx->head->query,
+		sql_query(_ctx->db, _ctx->head->query,
 			  transaction_commit_callback, ctx);
 	} else {
 		/* multiple queries, use a transaction */
 		ctx->refcount++;
 		sql_query(_ctx->db, "BEGIN", transaction_begin_callback, ctx);
-		while (ctx->head != NULL) {
+		while (_ctx->head != NULL) {
 			ctx->refcount++;
-			sql_query(_ctx->db, ctx->head->query,
-				  transaction_update_callback, ctx->head);
-			ctx->head = ctx->head->next;
+			sql_query(_ctx->db, _ctx->head->query,
+				  transaction_update_callback, _ctx->head);
+			_ctx->head = _ctx->head->next;
 		}
 		sql_query(_ctx->db, "COMMIT", transaction_commit_callback, ctx);
 	}
 }
 
+static void
+commit_multi_fail(struct pgsql_transaction_context *ctx,
+		  struct sql_result *result, const char *query)
+{
+	ctx->failed = TRUE;
+	ctx->error = t_strdup_printf("%s (query: %s)",
+				     sql_result_get_error(result), query);
+	sql_result_unref(result);
+}
+
+static struct sql_result *
+driver_pgsql_transaction_commit_multi(struct pgsql_transaction_context *ctx)
+{
+	struct pgsql_db *db = (struct pgsql_db *)ctx->ctx.db;
+	struct sql_result *result;
+	struct sql_transaction_query *query;
+
+	result = driver_pgsql_sync_query(db, "BEGIN");
+	if (sql_result_next_row(result) < 0) {
+		commit_multi_fail(ctx, result, "BEGIN");
+		return NULL;
+	}
+	sql_result_unref(result);
+
+	/* send queries */
+	for (query = ctx->ctx.head; query != NULL; query = query->next) {
+		result = driver_pgsql_sync_query(db, query->query);
+		if (sql_result_next_row(result) < 0) {
+			commit_multi_fail(ctx, result, query->query);
+			break;
+		}
+		if (query->affected_rows != NULL) {
+			struct pgsql_result *pg_result =
+				(struct pgsql_result *)result;
+
+			*query->affected_rows =
+				atoi(PQcmdTuples(pg_result->pgres));
+		}
+		sql_result_unref(result);
+	}
+
+	return driver_pgsql_sync_query(db, ctx->failed ?
+				       "ROLLBACK" : "COMMIT");
+}
+
 static int
 driver_pgsql_transaction_commit_s(struct sql_transaction_context *_ctx,
 				  const char **error_r)
 {
 	struct pgsql_transaction_context *ctx =
 		(struct pgsql_transaction_context *)_ctx;
+	struct pgsql_db *db = (struct pgsql_db *)_ctx->db;
+	struct sql_transaction_query *single_query = NULL;
 	struct sql_result *result;
 
 	*error_r = NULL;
 
-	if (ctx->failed || ctx->head == NULL) {
+	if (ctx->failed || _ctx->head == NULL) {
 		/* nothing to be done */
 		result = NULL;
-	} else if (ctx->head->next == NULL) {
+	} else if (_ctx->head->next == NULL) {
 		/* just a single query, send it */
-		result = sql_query_s(_ctx->db, ctx->head->query);
+		single_query = _ctx->head;
+		result = sql_query_s(_ctx->db, single_query->query);
 	} else {
 		/* multiple queries, use a transaction */
-		ctx->refcount++;
-		sql_query(_ctx->db, "BEGIN", transaction_begin_callback, ctx);
-		while (ctx->head != NULL) {
-			ctx->refcount++;
-			sql_query(_ctx->db, ctx->head->query,
-				  transaction_update_callback, ctx->head);
-			ctx->head = ctx->head->next;
-		}
-		if (ctx->refcount > 1) {
-			/* flush the previous queries */
-			(void)driver_pgsql_query_s(_ctx->db, NULL);
-		}
-
-		if (ctx->begin_failed) {
-			result = NULL;
-		} else if (ctx->failed) {
-			result = sql_query_s(_ctx->db, "ROLLBACK");
-		} else {
-			result = sql_query_s(_ctx->db, "COMMIT");
-		}
+		driver_pgsql_sync_init(db);
+		result = driver_pgsql_transaction_commit_multi(ctx);
+		driver_pgsql_sync_deinit(db);
 	}
 
-	if (ctx->failed)
+	if (ctx->failed) {
+		i_assert(ctx->error != NULL);
 		*error_r = ctx->error;
-	else if (result != NULL) {
+	} else if (result != NULL) {
 		if (sql_result_next_row(result) < 0)
 			*error_r = sql_result_get_error(result);
-		else if (ctx->head != NULL &&
-			 ctx->head->affected_rows != NULL) {
+		else if (single_query != NULL &&
+			 single_query->affected_rows != NULL) {
 			struct pgsql_result *pg_result =
 				(struct pgsql_result *)result;
 
-			*ctx->head->affected_rows =
+			*single_query->affected_rows =
 				atoi(PQcmdTuples(pg_result->pgres));
 		}
 	}
@@ -1054,28 +961,19 @@
 {
 	struct pgsql_transaction_context *ctx =
 		(struct pgsql_transaction_context *)_ctx;
-	struct pgsql_query_list *list;
 
-	list = p_new(ctx->query_pool, struct pgsql_query_list, 1);
-	list->ctx = ctx;
-	list->query = p_strdup(ctx->query_pool, query);
-	list->affected_rows = affected_rows;
-
-	if (ctx->head == NULL)
-		ctx->head = list;
-	else
-		ctx->tail->next = list;
-	ctx->tail = list;
+	sql_transaction_add_query(_ctx, ctx->query_pool, query, affected_rows);
 }
 
 const struct sql_db driver_pgsql_db = {
-	"pgsql",
+	.name = "pgsql",
+	.flags = SQL_DB_FLAG_POOLED,
 
 	.v = {
 		driver_pgsql_init_v,
 		driver_pgsql_deinit_v,
-		driver_pgsql_get_flags,
 		driver_pgsql_connect,
+		driver_pgsql_disconnect,
 		driver_pgsql_escape_string,
 		driver_pgsql_exec,
 		driver_pgsql_query,
--- a/src/lib-sql/driver-sqlite.c	Tue May 04 16:12:00 2010 +0300
+++ b/src/lib-sql/driver-sqlite.c	Tue May 04 17:55:23 2010 +0300
@@ -54,10 +54,19 @@
 		i_error("sqlite: open(%s) failed: %s", db->dbfile,
 			sqlite3_errmsg(db->sqlite));
 		sqlite3_close(db->sqlite);
+		db->sqlite = NULL;
 		return -1;
 	}
 }
 
+static void driver_sqlite_disconnect(struct sql_db *_db)
+{
+ 	struct sqlite_db *db = (struct sqlite_db *)_db;
+
+	sqlite3_close(db->sqlite);
+	db->sqlite = NULL;
+}
+
 static struct sql_db *driver_sqlite_init_v(const char *connect_string)
 {
 	struct sqlite_db *db;
@@ -84,12 +93,6 @@
 	pool_unref(&db->pool);
 }
 
-static enum sql_db_flags
-driver_sqlite_get_flags(struct sql_db *db ATTR_UNUSED)
-{
-	return SQL_DB_FLAG_BLOCKING;
-}
-
 static const char *
 driver_sqlite_escape_string(struct sql_db *_db ATTR_UNUSED,
 			    const char *string)
@@ -387,13 +390,14 @@
 }
 
 const struct sql_db driver_sqlite_db = {
-	"sqlite",
+	.name = "sqlite",
+	.flags = SQL_DB_FLAG_BLOCKING,
 
 	.v = {
 		driver_sqlite_init_v,
 		driver_sqlite_deinit_v,
-		driver_sqlite_get_flags,
 		driver_sqlite_connect,
+		driver_sqlite_disconnect,
 		driver_sqlite_escape_string,
 		driver_sqlite_exec,
 		driver_sqlite_query,
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib-sql/driver-sqlpool.c	Tue May 04 17:55:23 2010 +0300
@@ -0,0 +1,756 @@
+/* Copyright (c) 2010 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "llist.h"
+#include "ioloop.h"
+#include "sql-api-private.h"
+
+#include <time.h>
+
+#define QUERY_TIMEOUT_SECS 6
+
+struct sqlpool_host {
+	char *connect_string;
+
+	unsigned int connection_count;
+};
+
+struct sqlpool_connection {
+	struct sql_db *db;
+	unsigned int host_idx;
+};
+
+struct sqlpool_db {
+	struct sql_db api;
+
+	pool_t pool;
+	const struct sql_db *driver;
+	unsigned int connection_limit;
+
+	ARRAY_DEFINE(hosts, struct sqlpool_host);
+	/* all connections from all hosts */
+	ARRAY_DEFINE(all_connections, struct sqlpool_connection);
+	/* index of last connection in all_connections that was used to
+	   send a query. */
+	unsigned int last_query_conn_idx;
+
+	/* queued requests */
+	struct sqlpool_request *requests_head, *requests_tail;
+	struct timeout *request_to;
+};
+
+struct sqlpool_request {
+	struct sqlpool_request *prev, *next;
+
+	struct sqlpool_db *db;
+	time_t created;
+
+	unsigned int host_idx;
+	unsigned int retried:1;
+
+	/* requests are a) queries */
+	char *query;
+	sql_query_callback_t *callback;
+	void *context;
+
+	/* b) transaction waiters */
+	struct sqlpool_transaction_context *trans;
+};
+
+struct sqlpool_transaction_context {
+	struct sql_transaction_context ctx;
+
+	sql_commit_callback_t *callback;
+	void *context;
+
+	struct sqlpool_request *commit_request;
+	struct sql_transaction_context *conn_trans;
+
+	pool_t query_pool;
+};
+
+extern struct sql_db driver_sqlpool_db;
+
+static void
+driver_sqlpool_query_callback(struct sql_result *result,
+			      struct sqlpool_request *request);
+static void
+driver_sqlpool_commit_callback(const char *error,
+			       struct sqlpool_transaction_context *ctx);
+
+static struct sqlpool_request *
+sqlpool_request_new(struct sqlpool_db *db, const char *query)
+{
+	struct sqlpool_request *request;
+
+	request = i_new(struct sqlpool_request, 1);
+	request->db = db;
+	request->created = time(NULL);
+	request->query = i_strdup(query);
+	return request;
+}
+
+static void
+sqlpool_request_free(struct sqlpool_request **_request)
+{
+	struct sqlpool_request *request = *_request;
+
+	*_request = NULL;
+
+	i_assert(request->prev == NULL && request->next == NULL);
+	i_free(request->query);
+	i_free(request);
+}
+
+static void
+sqlpool_request_abort(struct sqlpool_request **_request)
+{
+	struct sqlpool_request *request = *_request;
+
+	*_request = NULL;
+
+	if (request->callback != NULL)
+		request->callback(&sql_not_connected_result, request->context);
+
+	i_assert(request->prev != NULL ||
+		 request->db->requests_head == request);
+	DLLIST2_REMOVE(&request->db->requests_head,
+		       &request->db->requests_tail, request);
+	sqlpool_request_free(&request);
+}
+
+static void
+driver_sqlpool_new_conn_trans(struct sqlpool_transaction_context *trans,
+			      struct sql_db *conndb)
+{
+	trans->conn_trans = sql_transaction_begin(conndb);
+	/* backend will use our queries list (we might still append more
+	   queries to the list) */
+	trans->conn_trans->head = trans->ctx.head;
+	trans->conn_trans->tail = trans->ctx.tail;
+}
+
+static void
+sqlpool_request_handle_transaction(struct sql_db *conndb,
+				   struct sqlpool_transaction_context *trans)
+{
+	i_assert(trans->conn_trans == NULL);
+
+	sqlpool_request_free(&trans->commit_request);
+	driver_sqlpool_new_conn_trans(trans, conndb);
+
+	if (trans->callback != NULL) {
+		/* commit() already called, finish it */
+		sql_transaction_commit(&trans->conn_trans,
+				       driver_sqlpool_commit_callback, trans);
+	}
+}
+
+static void
+sqlpool_request_send_next(struct sqlpool_db *db, struct sql_db *conndb)
+{
+	struct sqlpool_request *request;
+
+	if (db->requests_head == NULL || !SQL_DB_IS_READY(conndb))
+		return;
+
+	request = db->requests_head;
+	DLLIST2_REMOVE(&db->requests_head, &db->requests_tail, request);
+	timeout_reset(db->request_to);
+
+	if (request->query != NULL) {
+		sql_query(conndb, request->query,
+			  driver_sqlpool_query_callback, request);
+	} else if (request->trans != NULL) {
+		sqlpool_request_handle_transaction(conndb, request->trans);
+	} else {
+		i_unreached();
+	}
+}
+
+static void sqlpool_reconnect(struct sql_db *conndb)
+{
+	timeout_remove(&conndb->to_reconnect);
+	(void)sql_connect(conndb);
+}
+
+static void
+sqlpool_state_changed(struct sql_db *conndb, enum sql_db_state prev_state,
+		      void *context)
+{
+	struct sqlpool_db *db = context;
+
+	if (conndb->state == SQL_DB_STATE_IDLE) {
+		conndb->connect_failure_count = 0;
+		conndb->connect_delay = SQL_CONNECT_MIN_DELAY;
+		sqlpool_request_send_next(db, conndb);
+	}
+
+	if (prev_state == SQL_DB_STATE_CONNECTING &&
+	    conndb->state == SQL_DB_STATE_DISCONNECTED) {
+		/* connect failed */
+		if (conndb->connect_failure_count > 0) {
+			/* increase delay between reconnections to this
+			   server */
+			conndb->connect_delay *= 5;
+			if (conndb->connect_delay > SQL_CONNECT_MAX_DELAY)
+				conndb->connect_delay = SQL_CONNECT_MAX_DELAY;
+		}
+		conndb->connect_failure_count++;
+
+		/* reconnect after the delay */
+		if (conndb->to_reconnect != NULL)
+			timeout_remove(&conndb->to_reconnect);
+		conndb->to_reconnect = timeout_add(conndb->connect_delay * 1000,
+						   sqlpool_reconnect, conndb);
+	}
+}
+
+static struct sqlpool_host *
+sqlpool_find_host_with_least_connections(struct sqlpool_db *db,
+					 unsigned int *host_idx_r)
+{
+	struct sqlpool_host *hosts, *min = NULL;
+	unsigned int i, count;
+
+	hosts = array_get_modifiable(&db->hosts, &count);
+	for (i = 0; i < count; i++) {
+		if (min == NULL ||
+		    min->connection_count > hosts[i].connection_count) {
+			min = &hosts[i];
+			*host_idx_r = i;
+		}
+	}
+	i_assert(min != NULL);
+	return min;
+}
+
+static struct sqlpool_connection *sqlpool_add_connection(struct sqlpool_db *db)
+{
+	struct sql_db *conndb;
+	struct sqlpool_host *host;
+	struct sqlpool_connection *conn;
+	unsigned int host_idx;
+
+	host = sqlpool_find_host_with_least_connections(db, &host_idx);
+	if (host->connection_count >= db->connection_limit)
+		return NULL;
+	host->connection_count++;
+
+	conndb = db->driver->v.init(host->connect_string);
+	i_array_init(&conndb->module_contexts, 5);
+
+	conndb->state_change_callback = sqlpool_state_changed;
+	conndb->state_change_context = db;
+	conndb->connect_delay = SQL_CONNECT_MIN_DELAY;
+
+	conn = array_append_space(&db->all_connections);
+	conn->host_idx = host_idx;
+	conn->db = conndb;
+	return conn;
+}
+
+static const struct sqlpool_connection *
+sqlpool_find_available_connection(struct sqlpool_db *db,
+				  unsigned int unwanted_host_idx,
+				  bool *all_disconnected_r)
+{
+	const struct sqlpool_connection *conns;
+	unsigned int i, count;
+
+	*all_disconnected_r = TRUE;
+
+	conns = array_get(&db->all_connections, &count);
+	for (i = 0; i < count; i++) {
+		unsigned int idx = (i + db->last_query_conn_idx) % count;
+		struct sql_db *conndb = conns[idx].db;
+
+		if (conns[idx].host_idx == unwanted_host_idx)
+			continue;
+
+		if (!SQL_DB_IS_READY(conndb)) {
+			/* see if we could reconnect to it immediately */
+			(void)sql_connect(conndb);
+		}
+		if (SQL_DB_IS_READY(conndb)) {
+			db->last_query_conn_idx = idx;
+			*all_disconnected_r = FALSE;
+			return &conns[idx];
+		}
+		if (conndb->state != SQL_DB_STATE_DISCONNECTED)
+			*all_disconnected_r = FALSE;
+	}
+	return NULL;
+}
+
+static bool
+driver_sqlpool_get_connection(struct sqlpool_db *db,
+			      unsigned int unwanted_host_idx,
+			      const struct sqlpool_connection **conn_r)
+{
+	const struct sqlpool_connection *conn, *conns;
+	unsigned int i, count;
+	bool all_disconnected;
+
+	conn = sqlpool_find_available_connection(db, unwanted_host_idx,
+						 &all_disconnected);
+	if (conn == NULL && unwanted_host_idx != -1U) {
+		/* maybe there are no wanted hosts. use any of them. */
+		conn = sqlpool_find_available_connection(db, -1U,
+							 &all_disconnected);
+	}
+	if (conn == NULL && all_disconnected) {
+		/* no connected connections. connect_delays may have gotten too
+		   high, reset all of them to see if some are still alive. */
+		conns = array_get(&db->all_connections, &count);
+		for (i = 0; i < count; i++) {
+			struct sql_db *conndb = conns[i].db;
+
+			if (conndb->connect_delay > SQL_CONNECT_RESET_DELAY)
+				conndb->connect_delay = SQL_CONNECT_RESET_DELAY;
+		}
+		conn = sqlpool_find_available_connection(db, -1U,
+							 &all_disconnected);
+	}
+	if (conn == NULL) {
+		/* still nothing. try creating new connections */
+		conn = sqlpool_add_connection(db);
+		if (conn == NULL || !SQL_DB_IS_READY(conn->db))
+			return FALSE;
+	}
+	*conn_r = conn;
+	return TRUE;
+}
+
+static bool
+driver_sqlpool_get_sync_connection(struct sqlpool_db *db,
+				   const struct sqlpool_connection **conn_r)
+{
+	const struct sqlpool_connection *conns;
+	unsigned int i, count;
+
+	if (driver_sqlpool_get_connection(db, -1U, conn_r))
+		return TRUE;
+
+	/* no idling connections, but maybe we can find one that's trying to
+	   connect to server, and we can use it once it's finished */
+	conns = array_get(&db->all_connections, &count);
+	for (i = 0; i < count; i++) {
+		if (conns[i].db->state == SQL_DB_STATE_CONNECTING) {
+			*conn_r = &conns[i];
+			return TRUE;
+		}
+	}
+	return FALSE;
+}
+
+static void
+driver_sqlpool_parse_hosts(struct sqlpool_db *db, const char *connect_string)
+{
+	const char *const *args, *key, *value, *const *hostnamep;
+	struct sqlpool_host *host;
+	ARRAY_TYPE(const_string) hostnames, connect_args;
+
+	t_array_init(&hostnames, 8);
+	t_array_init(&connect_args, 32);
+
+	/* connect string is a space separated list. it may contain
+	   backend-specific strings which we'll pass as-is. we'll only care
+	   about our own settings, plus the host settings. */
+	args = t_strsplit_spaces(connect_string, " ");
+	for (; *args != NULL; args++) {
+		value = strchr(*args, '=');
+		if (value == NULL) {
+			key = *args;
+			value = "";
+		} else {
+			key = t_strdup_until(*args, value);
+			value++;
+		}
+
+		if (strcmp(key, "maxconns") == 0) {
+			if (str_to_uint(value, &db->connection_limit) < 0) {
+				i_fatal("Invalid value for maxconns: %s",
+					value);
+			}
+		} else if (strcmp(key, "host") == 0) {
+			array_append(&hostnames, &value, 1);
+		} else {
+			array_append(&connect_args, args, 1);
+		}
+	}
+
+	/* build a new connect string without our settings or hosts */
+	(void)array_append_space(&connect_args);
+	connect_string = t_strarray_join(array_idx(&connect_args, 0), " ");
+
+	if (array_count(&hostnames) == 0) {
+		/* no hosts specified. create a default one. */
+		host = array_append_space(&db->hosts);
+		host->connect_string = i_strdup(connect_string);
+	} else {
+		if (*connect_string == '\0')
+			connect_string = NULL;
+
+		array_foreach(&hostnames, hostnamep) {
+			host = array_append_space(&db->hosts);
+			host->connect_string =
+				i_strconcat("host=", *hostnamep, " ",
+					    connect_string, NULL);
+		}
+	}
+
+	if (db->connection_limit == 0)
+		db->connection_limit = SQL_DEFAULT_CONNECTION_LIMIT;
+}
+
+struct sql_db *
+driver_sqlpool_init(const char *connect_string, const struct sql_db *driver)
+{
+	struct sqlpool_db *db;
+
+	i_assert(connect_string != NULL);
+
+	db = i_new(struct sqlpool_db, 1);
+	db->driver = driver;
+	db->api = driver_sqlpool_db;
+	db->api.flags = driver->flags;
+	i_array_init(&db->hosts, 8);
+
+	T_BEGIN {
+		driver_sqlpool_parse_hosts(db, connect_string);
+	} T_END;
+
+	i_array_init(&db->all_connections, 16);
+	/* always have at least one backend connection initialized */
+	(void)sqlpool_add_connection(db);
+	return &db->api;
+}
+
+static void driver_sqlpool_deinit(struct sql_db *_db)
+{
+	struct sqlpool_db *db = (struct sqlpool_db *)_db;
+	struct sqlpool_host *host;
+	struct sqlpool_connection *conn;
+
+	array_foreach_modifiable(&db->all_connections, conn)
+		sql_deinit(&conn->db);
+	array_clear(&db->all_connections);
+
+	while (db->requests_head != NULL) {
+		struct sqlpool_request *request = db->requests_head;
+
+		sqlpool_request_abort(&request);
+	}
+	if (db->request_to != NULL)
+		timeout_remove(&db->request_to);
+
+	array_foreach_modifiable(&db->hosts, host)
+		i_free(host->connect_string);
+
+	i_assert(array_count(&db->all_connections) == 0);
+	array_free(&db->hosts);
+	array_free(&db->all_connections);
+	array_free(&_db->module_contexts);
+	i_free(db);
+}
+
+static int driver_sqlpool_connect(struct sql_db *_db)
+{
+	struct sqlpool_db *db = (struct sqlpool_db *)_db;
+	const struct sqlpool_connection *conn;
+	int ret = -1, ret2;
+
+	array_foreach(&db->all_connections, conn) {
+		ret2 = sql_connect(conn->db);
+		if (ret2 > 0)
+			return 1;
+		if (ret2 == 0)
+			ret = 0;
+	}
+	return ret;
+}
+
+static void driver_sqlpool_disconnect(struct sql_db *_db)
+{
+	struct sqlpool_db *db = (struct sqlpool_db *)_db;
+	const struct sqlpool_connection *conn;
+
+	array_foreach(&db->all_connections, conn)
+		sql_disconnect(conn->db);
+}
+
+static const char *
+driver_sqlpool_escape_string(struct sql_db *_db, const char *string)
+{
+	struct sqlpool_db *db = (struct sqlpool_db *)_db;
+	const struct sqlpool_connection *conn;
+
+	/* we always have at least one connection */
+	conn = array_idx(&db->all_connections, 0);
+	return sql_escape_string(conn->db, string);
+}
+
+static void driver_sqlpool_timeout(struct sqlpool_db *db)
+{
+	while (db->requests_head != NULL) {
+		struct sqlpool_request *request = db->requests_head;
+
+		if (request->created + SQL_QUERY_TIMEOUT_SECS > ioloop_time)
+			break;
+
+		i_error("%s: Query timed out "
+			"(no free connections for %u secs): %s",
+			db->driver->name,
+			(unsigned int)(ioloop_time - request->created),
+			request->query != NULL ? request->query :
+			"<transaction>");
+		sqlpool_request_abort(&request);
+	}
+
+	if (db->requests_head == NULL)
+		timeout_remove(&db->request_to);
+}
+
+static void
+driver_sqlpool_prepend_request(struct sqlpool_db *db,
+			       struct sqlpool_request *request)
+{
+	DLLIST2_PREPEND(&db->requests_head, &db->requests_tail, request);
+	if (db->request_to == NULL) {
+		db->request_to = timeout_add(SQL_QUERY_TIMEOUT_SECS * 1000,
+					     driver_sqlpool_timeout, db);
+	}
+}
+
+static void
+driver_sqlpool_append_request(struct sqlpool_db *db,
+			      struct sqlpool_request *request)
+{
+	DLLIST2_APPEND(&db->requests_head, &db->requests_tail, request);
+	if (db->request_to == NULL) {
+		db->request_to = timeout_add(SQL_QUERY_TIMEOUT_SECS * 1000,
+					     driver_sqlpool_timeout, db);
+	}
+}
+
+static void
+driver_sqlpool_query_callback(struct sql_result *result,
+			      struct sqlpool_request *request)
+{
+	struct sqlpool_db *db = request->db;
+	const struct sqlpool_connection *conn;
+	struct sql_db *conndb;
+
+	if (result->failed_try_retry && !request->retried) {
+		i_error("%s: Query failed, retrying: %s",
+			db->driver->name, sql_result_get_error(result));
+		request->retried = TRUE;
+		driver_sqlpool_prepend_request(db, request);
+
+		if (driver_sqlpool_get_connection(request->db,
+						  request->host_idx, &conn)) {
+			request->host_idx = conn->host_idx;
+			sqlpool_request_send_next(db, conn->db);
+		}
+	} else {
+		if (result->failed) {
+			i_error("%s: Query failed, aborting: %s",
+				db->driver->name, request->query);
+		}
+		conndb = result->db;
+
+		if (request->callback != NULL)
+			request->callback(result, request->context);
+		sqlpool_request_free(&request);
+
+		sqlpool_request_send_next(db, conndb);
+	}
+}
+
+static void driver_sqlpool_query(struct sql_db *_db, const char *query,
+				 sql_query_callback_t *callback, void *context)
+{
+        struct sqlpool_db *db = (struct sqlpool_db *)_db;
+	struct sqlpool_request *request;
+	const struct sqlpool_connection *conn;
+
+	request = sqlpool_request_new(db, query);
+	request->callback = callback;
+	request->context = context;
+
+	if (!driver_sqlpool_get_connection(db, -1U, &conn))
+		driver_sqlpool_append_request(db, request);
+	else {
+		request->host_idx = conn->host_idx;
+		sql_query(conn->db, query, driver_sqlpool_query_callback,
+			  request);
+	}
+}
+
+static void driver_sqlpool_exec(struct sql_db *_db, const char *query)
+{
+	driver_sqlpool_query(_db, query, NULL, NULL);
+}
+
+static struct sql_result *
+driver_sqlpool_query_s(struct sql_db *_db, const char *query)
+{
+        struct sqlpool_db *db = (struct sqlpool_db *)_db;
+	const struct sqlpool_connection *conn;
+	struct sql_result *result;
+
+	if (!driver_sqlpool_get_sync_connection(db, &conn)) {
+		sql_not_connected_result.refcount++;
+		return &sql_not_connected_result;
+	}
+
+	result = sql_query_s(conn->db, query);
+	if (result->failed_try_retry) {
+		if (!driver_sqlpool_get_sync_connection(db, &conn))
+			return result;
+
+		sql_result_unref(result);
+		result = sql_query_s(conn->db, query);
+	}
+	return result;
+}
+
+static struct sql_transaction_context *
+driver_sqlpool_transaction_begin(struct sql_db *_db)
+{
+        struct sqlpool_db *db = (struct sqlpool_db *)_db;
+	struct sqlpool_transaction_context *ctx;
+	const struct sqlpool_connection *conn;
+
+	ctx = i_new(struct sqlpool_transaction_context, 1);
+	ctx->ctx.db = _db;
+
+	if (driver_sqlpool_get_connection(db, -1U, &conn))
+		ctx->conn_trans = sql_transaction_begin(conn->db);
+	else {
+		/* queue changes until we get a connection */
+		ctx->commit_request = sqlpool_request_new(db, NULL);
+		ctx->commit_request->trans = ctx;
+		ctx->query_pool = pool_alloconly_create("sqlpool transaction",
+							1024);
+		driver_sqlpool_append_request(db, ctx->commit_request);
+	}
+	return &ctx->ctx;
+}
+
+static void
+driver_sqlpool_transaction_free(struct sqlpool_transaction_context *ctx)
+{
+	i_assert(ctx->conn_trans == NULL);
+
+	if (ctx->commit_request != NULL)
+		sqlpool_request_abort(&ctx->commit_request);
+	if (ctx->query_pool != NULL)
+		pool_unref(&ctx->query_pool);
+	i_free(ctx);
+}
+
+static void
+driver_sqlpool_commit_callback(const char *error,
+			       struct sqlpool_transaction_context *ctx)
+{
+	ctx->callback(error, ctx->context);
+	driver_sqlpool_transaction_free(ctx);
+}
+
+static void
+driver_sqlpool_transaction_commit(struct sql_transaction_context *_ctx,
+				  sql_commit_callback_t *callback,
+				  void *context)
+{
+	struct sqlpool_transaction_context *ctx =
+		(struct sqlpool_transaction_context *)_ctx;
+
+	ctx->callback = callback;
+	ctx->context = context;
+
+	if (ctx->conn_trans != NULL) {
+		sql_transaction_commit(&ctx->conn_trans,
+				       driver_sqlpool_commit_callback, ctx);
+	}
+}
+
+static int
+driver_sqlpool_transaction_commit_s(struct sql_transaction_context *_ctx,
+				    const char **error_r)
+{
+	struct sqlpool_transaction_context *ctx =
+		(struct sqlpool_transaction_context *)_ctx;
+        struct sqlpool_db *db = (struct sqlpool_db *)_ctx->db;
+	const struct sqlpool_connection *conn;
+	int ret;
+
+	*error_r = NULL;
+
+	if (ctx->conn_trans == NULL) {
+		if (driver_sqlpool_get_sync_connection(db, &conn))
+			driver_sqlpool_new_conn_trans(ctx, conn->db);
+		else {
+			*error_r = SQL_ERRSTR_NOT_CONNECTED;
+			driver_sqlpool_transaction_free(ctx);
+			return -1;
+		}
+		sqlpool_request_abort(&ctx->commit_request);
+	}
+
+	ret = sql_transaction_commit_s(&ctx->conn_trans, error_r);
+	driver_sqlpool_transaction_free(ctx);
+	return ret;
+}
+
+static void
+driver_sqlpool_transaction_rollback(struct sql_transaction_context *_ctx)
+{
+	struct sqlpool_transaction_context *ctx =
+		(struct sqlpool_transaction_context *)_ctx;
+
+	if (ctx->conn_trans != NULL)
+		sql_transaction_rollback(&ctx->conn_trans);
+	driver_sqlpool_transaction_free(ctx);
+}
+
+static void
+driver_sqlpool_update(struct sql_transaction_context *_ctx, const char *query,
+		      unsigned int *affected_rows)
+{
+	struct sqlpool_transaction_context *ctx =
+		(struct sqlpool_transaction_context *)_ctx;
+
+	if (ctx->conn_trans != NULL && ctx->ctx.head == NULL)
+		sql_update_get_rows(ctx->conn_trans, query, affected_rows);
+	else {
+		/* we didn't get a connection for transaction immediately.
+		   queue updates until commit transfers all of these */
+		sql_transaction_add_query(&ctx->ctx, ctx->query_pool,
+					  query, affected_rows);
+	}
+}
+
+struct sql_db driver_sqlpool_db = {
+	"",
+
+	.v = {
+		NULL,
+		driver_sqlpool_deinit,
+		driver_sqlpool_connect,
+		driver_sqlpool_disconnect,
+		driver_sqlpool_escape_string,
+		driver_sqlpool_exec,
+		driver_sqlpool_query,
+		driver_sqlpool_query_s,
+
+		driver_sqlpool_transaction_begin,
+		driver_sqlpool_transaction_commit,
+		driver_sqlpool_transaction_commit_s,
+		driver_sqlpool_transaction_rollback,
+
+		driver_sqlpool_update
+	}
+};
--- a/src/lib-sql/sql-api-private.h	Tue May 04 16:12:00 2010 +0300
+++ b/src/lib-sql/sql-api-private.h	Tue May 04 17:55:23 2010 +0300
@@ -4,6 +4,37 @@
 #include "sql-api.h"
 #include "module-context.h"
 
+enum sql_db_state {
+	/* not connected to database */
+	SQL_DB_STATE_DISCONNECTED,
+	/* waiting for connection attempt to succeed or fail */
+	SQL_DB_STATE_CONNECTING,
+	/* connected, allowing more queries */
+	SQL_DB_STATE_IDLE,
+	/* connected, no more queries allowed */
+	SQL_DB_STATE_BUSY
+};
+
+/* Minimum delay between reconnecting to same server */
+#define SQL_CONNECT_MIN_DELAY 1
+/* Maximum time to avoiding reconnecting to same server */
+#define SQL_CONNECT_MAX_DELAY (60*30)
+/* If no servers are connected but a query is requested, try reconnecting to
+   next server which has been disconnected longer than this (with a single
+   server setup this is really the "max delay" and the SQL_CONNECT_MAX_DELAY
+   is never used). */
+#define SQL_CONNECT_RESET_DELAY 15
+/* Abort connect() if it can't connect within this time. */
+#define SQL_CONNECT_TIMEOUT_SECS 10
+/* Abort queries after this many seconds */
+#define SQL_QUERY_TIMEOUT_SECS 60
+/* Default max. number of connections to create per host */
+#define SQL_DEFAULT_CONNECTION_LIMIT 5
+
+#define SQL_DB_IS_READY(db) \
+	((db)->state == SQL_DB_STATE_IDLE)
+#define SQL_ERRSTR_NOT_CONNECTED "Not connected to database"
+
 struct sql_db_module_register {
 	unsigned int id;
 };
@@ -14,14 +45,22 @@
 
 extern struct sql_db_module_register sql_db_module_register;
 
+struct sql_transaction_query {
+	struct sql_transaction_query *next;
+	struct sql_transaction_context *trans;
+
+	const char *query;
+	unsigned int *affected_rows;
+};
+
 struct sql_db_vfuncs {
 	struct sql_db *(*init)(const char *connect_string);
 	void (*deinit)(struct sql_db *db);
 
-	enum sql_db_flags (*get_flags)(struct sql_db *db);
+	int (*connect)(struct sql_db *db);
+	void (*disconnect)(struct sql_db *db);
+	const char *(*escape_string)(struct sql_db *db, const char *string);
 
-	int (*connect)(struct sql_db *db);
-	const char *(*escape_string)(struct sql_db *db, const char *string);
 	void (*exec)(struct sql_db *db, const char *query);
 	void (*query)(struct sql_db *db, const char *query,
 		      sql_query_callback_t *callback, void *context);
@@ -41,8 +80,23 @@
 
 struct sql_db {
 	const char *name;
+	enum sql_db_flags flags;
+
 	struct sql_db_vfuncs v;
 	ARRAY_DEFINE(module_contexts, union sql_db_module_context *);
+
+	void (*state_change_callback)(struct sql_db *db,
+				      enum sql_db_state prev_state,
+				      void *context);
+	void *state_change_context;
+
+	enum sql_db_state state;
+	/* last time we started connecting to this server
+	   (which may or may not have succeeded) */
+	time_t last_connect_try;
+	unsigned int connect_delay;
+	unsigned int connect_failure_count;
+	struct timeout *to_reconnect;
 };
 
 struct sql_result_vfuncs {
@@ -84,11 +138,16 @@
 	void *fetch_dest;
 	size_t fetch_dest_size;
 
+	unsigned int failed:1;
+	unsigned int failed_try_retry:1;
 	unsigned int callback:1;
 };
 
 struct sql_transaction_context {
 	struct sql_db *db;
+
+	/* commit() must use this query list if head is non-NULL. */
+	struct sql_transaction_query *head, *tail;
 };
 
 ARRAY_DEFINE_TYPE(sql_drivers, const struct sql_db *);
@@ -96,4 +155,12 @@
 extern ARRAY_TYPE(sql_drivers) sql_drivers;
 extern struct sql_result sql_not_connected_result;
 
+struct sql_db *
+driver_sqlpool_init(const char *connect_string, const struct sql_db *driver);
+
+void sql_db_set_state(struct sql_db *db, enum sql_db_state state);
+
+void sql_transaction_add_query(struct sql_transaction_context *ctx, pool_t pool,
+			       const char *query, unsigned int *affected_rows);
+
 #endif
--- a/src/lib-sql/sql-api.c	Tue May 04 16:12:00 2010 +0300
+++ b/src/lib-sql/sql-api.c	Tue May 04 17:55:23 2010 +0300
@@ -2,9 +2,11 @@
 
 #include "lib.h"
 #include "array.h"
+#include "ioloop.h"
 #include "sql-api-private.h"
 
 #include <stdlib.h>
+#include <time.h>
 
 struct sql_db_module_register sql_db_module_register = { 0 };
 ARRAY_TYPE(sql_drivers) sql_drivers;
@@ -38,23 +40,36 @@
 	}
 }
 
-struct sql_db *sql_init(const char *db_driver,
-			const char *connect_string ATTR_UNUSED)
+static const struct sql_db *sql_find_driver(const char *name)
 {
 	const struct sql_db *const *drivers;
 	unsigned int i, count;
-	struct sql_db *db;
 
 	drivers = array_get(&sql_drivers, &count);
 	for (i = 0; i < count; i++) {
-		if (strcmp(db_driver, drivers[i]->name) == 0) {
-			db = drivers[i]->v.init(connect_string);
-			i_array_init(&db->module_contexts, 5);
-			return db;
-		}
+		if (strcmp(drivers[i]->name, name) == 0)
+			return drivers[i];
 	}
+	return NULL;
+}
 
-	i_fatal("Unknown database driver '%s'", db_driver);
+struct sql_db *sql_init(const char *db_driver, const char *connect_string)
+{
+	const struct sql_db *driver;
+	struct sql_db *db;
+
+	i_assert(connect_string != NULL);
+
+	driver = sql_find_driver(db_driver);
+	if (driver == NULL)
+		i_fatal("Unknown database driver '%s'", db_driver);
+
+	if ((driver->flags & SQL_DB_FLAG_POOLED) == 0)
+		db = driver->v.init(connect_string);
+	else
+		db = driver_sqlpool_init(connect_string, driver);
+	i_array_init(&db->module_contexts, 5);
+	return db;
 }
 
 void sql_deinit(struct sql_db **_db)
@@ -62,19 +77,46 @@
 	struct sql_db *db = *_db;
 
 	*_db = NULL;
+
+	if (db->to_reconnect != NULL)
+		timeout_remove(&db->to_reconnect);
 	db->v.deinit(db);
 }
 
 enum sql_db_flags sql_get_flags(struct sql_db *db)
 {
-	return db->v.get_flags(db);
+	return db->flags;
 }
 
 int sql_connect(struct sql_db *db)
 {
+	time_t now;
+
+	switch (db->state) {
+	case SQL_DB_STATE_DISCONNECTED:
+		break;
+	case SQL_DB_STATE_CONNECTING:
+		return 0;
+	default:
+		return 1;
+	}
+
+	/* don't try reconnecting more than once a second */
+	now = time(NULL);
+	if (db->last_connect_try + (time_t)db->connect_delay > now)
+		return -1;
+	db->last_connect_try = now;
+
 	return db->v.connect(db);
 }
 
+void sql_disconnect(struct sql_db *db)
+{
+	if (db->to_reconnect != NULL)
+		timeout_remove(&db->to_reconnect);
+	db->v.disconnect(db);
+}
+
 const char *sql_escape_string(struct sql_db *db, const char *string)
 {
 	return db->v.escape_string(db, string);
@@ -286,7 +328,7 @@
 static const char *
 sql_result_not_connected_get_error(struct sql_result *result ATTR_UNUSED)
 {
-	return "Not connected to database";
+	return SQL_ERRSTR_NOT_CONNECTED;
 }
 
 struct sql_transaction_context *sql_transaction_begin(struct sql_db *db)
@@ -332,11 +374,43 @@
 	ctx->db->v.update(ctx, query, affected_rows);
 }
 
+void sql_db_set_state(struct sql_db *db, enum sql_db_state state)
+{
+	enum sql_db_state old_state = db->state;
+
+	if (db->state == state)
+		return;
+
+	db->state = state;
+	if (db->state_change_callback != NULL) {
+		db->state_change_callback(db, old_state,
+					  db->state_change_context);
+	}
+}
+
+void sql_transaction_add_query(struct sql_transaction_context *ctx, pool_t pool,
+			       const char *query, unsigned int *affected_rows)
+{
+	struct sql_transaction_query *tquery;
+
+	tquery = p_new(pool, struct sql_transaction_query, 1);
+	tquery->trans = ctx;
+	tquery->query = p_strdup(pool, query);
+	tquery->affected_rows = affected_rows;
+
+	if (ctx->head == NULL)
+		ctx->head = tquery;
+	else
+		ctx->tail->next = tquery;
+	ctx->tail = tquery;
+}
+
 struct sql_result sql_not_connected_result = {
 	.v = {
 		sql_result_not_connected_free,
 		sql_result_not_connected_next_row,
 		NULL, NULL, NULL, NULL, NULL, NULL, NULL,
 		sql_result_not_connected_get_error
-	}
+	},
+	.failed_try_retry = TRUE
 };
--- a/src/lib-sql/sql-api.h	Tue May 04 16:12:00 2010 +0300
+++ b/src/lib-sql/sql-api.h	Tue May 04 17:55:23 2010 +0300
@@ -6,7 +6,9 @@
 
 enum sql_db_flags {
 	/* Set if queries are not executed asynchronously */
-	SQL_DB_FLAG_BLOCKING		= 0x01
+	SQL_DB_FLAG_BLOCKING		= 0x01,
+	/* Set if database wants to use connection pooling */
+	SQL_DB_FLAG_POOLED		= 0x02
 };
 
 enum sql_field_type {
@@ -63,6 +65,8 @@
    though. Returns -1 if we're not connected, 0 if we started connecting or
    1 if we are fully connected now. */
 int sql_connect(struct sql_db *db);
+/* Explicitly disconnect from database. */
+void sql_disconnect(struct sql_db *db);
 
 /* Escape the given string if needed and return it. */
 const char *sql_escape_string(struct sql_db *db, const char *string);