Mercurial > dovecot > original-hg > dovecot-1.2
changeset 8684:ad35aae691ce HEAD
pgsql: Don't break when using multiple transactions.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Sun, 25 Jan 2009 19:40:15 -0500 |
parents | a68ed51b681d |
children | 6ee78e18d026 |
files | src/lib-sql/driver-mysql.c src/lib-sql/driver-pgsql.c |
diffstat | 2 files changed, 119 insertions(+), 108 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lib-sql/driver-mysql.c Sun Jan 25 19:39:41 2009 -0500 +++ b/src/lib-sql/driver-mysql.c Sun Jan 25 19:40:15 2009 -0500 @@ -654,10 +654,6 @@ (struct mysql_transaction_context *)_ctx; struct mysql_query_list *list; - /* FIXME: with mysql we're just appending everything into one big - linked list which gets committed in sql_transaction_commit(). - we could avoid this if we knew for sure that transactions actually - worked, but I don't know how to do that.. */ list = p_new(ctx->query_pool, struct mysql_query_list, 1); list->query = p_strdup(ctx->query_pool, query);
--- a/src/lib-sql/driver-pgsql.c Sun Jan 25 19:39:41 2009 -0500 +++ b/src/lib-sql/driver-pgsql.c Sun Jan 25 19:40:15 2009 -0500 @@ -75,15 +75,19 @@ sql_commit_callback_t *callback; void *context; - char *first_update; + pool_t query_pool; + struct pgsql_query_list *head, *tail; const char *error; - unsigned int opened:1; unsigned int begin_succeeded:1; unsigned int begin_failed:1; unsigned int failed:1; }; +struct pgsql_query_list { + struct pgsql_query_list *next; + const char *query; +}; extern struct sql_db driver_pgsql_db; extern struct sql_result driver_pgsql_result; @@ -318,7 +322,9 @@ } T_END; result->api.callback = FALSE; free_result = db->sync_result != &result->api; - } + if (db->queue == NULL && db->ioloop != NULL) + io_loop_stop(db->ioloop); + } if (disconnected) { /* disconnected */ @@ -448,6 +454,9 @@ i_free(queue->result); i_free(queue->query); i_free(queue); + + if (db->queue == NULL && db->ioloop != NULL) + io_loop_stop(db->ioloop); } static void queue_drop_timed_out_queries(struct pgsql_db *db) @@ -604,7 +613,6 @@ db->query_finished = TRUE; db->sync_result = result; - io_loop_stop(db->ioloop); } static struct sql_result * @@ -634,7 +642,8 @@ } db->query_finished = FALSE; - driver_pgsql_query(_db, query, pgsql_query_s_callback, db); + if (query != NULL) + driver_pgsql_query(_db, query, pgsql_query_s_callback, db); if (!db->query_finished) { if ((db->connected || db->connecting) && db->io != NULL) @@ -849,6 +858,9 @@ ctx = i_new(struct pgsql_transaction_context, 1); ctx->ctx.db = db; ctx->refcount = 1; + /* we need to be able to handle multiple open transactions, so at least + for now just keep them in memory until commit time. */ + ctx->query_pool = pool_alloconly_create("pgsql transaction", 1024); return &ctx->ctx; } @@ -859,7 +871,7 @@ if (--ctx->refcount > 0) return; - i_free(ctx->first_update); + pool_unref(&ctx->query_pool); i_free(ctx); } @@ -873,86 +885,6 @@ ctx->callback(sql_result_get_error(result), ctx->context); else ctx->callback(NULL, ctx->context); -} - -static void -driver_pgsql_transaction_commit(struct sql_transaction_context *_ctx, - sql_commit_callback_t *callback, void *context) -{ - struct pgsql_transaction_context *ctx = - (struct pgsql_transaction_context *)_ctx; - - if (ctx->failed) { - callback(ctx->error, context); - if (!ctx->begin_failed) - sql_exec(_ctx->db, "ROLLBACK"); - driver_pgsql_transaction_unref(ctx); - return; - } - - ctx->callback = callback; - ctx->context = context; - - if (ctx->first_update != NULL) { - sql_query(_ctx->db, ctx->first_update, - transaction_commit_callback, ctx); - i_free_and_null(ctx->first_update); - } else if (ctx->opened) { - driver_pgsql_query_full(_ctx->db, "COMMIT", - transaction_commit_callback, ctx, - FALSE); - } else { - /* nothing done */ - ctx->callback(NULL, ctx->context); - driver_pgsql_transaction_unref(ctx); - } -} - -static int -driver_pgsql_transaction_commit_s(struct sql_transaction_context *_ctx, - const char **error_r) -{ - struct pgsql_transaction_context *ctx = - (struct pgsql_transaction_context *)_ctx; - struct sql_result *result; - - if (ctx->failed) { - *error_r = ctx->error; - if (!ctx->begin_failed) - sql_exec(_ctx->db, "ROLLBACK"); - } else if (ctx->first_update != NULL) { - result = sql_query_s(_ctx->db, ctx->first_update); - if (sql_result_next_row(result) < 0) - *error_r = sql_result_get_error(result); - else - *error_r = NULL; - i_free_and_null(ctx->first_update); - sql_result_free(result); - } else if (!ctx->opened) { - *error_r = NULL; - } else { - result = sql_query_s(_ctx->db, "COMMIT"); - if (ctx->failed) - *error_r = ctx->error; - else if (sql_result_next_row(result) < 0) - *error_r = sql_result_get_error(result); - else - *error_r = NULL; - sql_result_free(result); - } - - driver_pgsql_transaction_unref(ctx); - return *error_r == NULL ? 0 : -1; -} - -static void -driver_pgsql_transaction_rollback(struct sql_transaction_context *_ctx) -{ - struct pgsql_transaction_context *ctx = - (struct pgsql_transaction_context *)_ctx; - - if (!ctx->begin_failed) - sql_exec(_ctx->db, "ROLLBACK"); driver_pgsql_transaction_unref(ctx); } @@ -973,32 +905,115 @@ } static void -driver_pgsql_update(struct sql_transaction_context *_ctx, const char *query) +driver_pgsql_transaction_commit(struct sql_transaction_context *_ctx, + sql_commit_callback_t *callback, void *context) { struct pgsql_transaction_context *ctx = (struct pgsql_transaction_context *)_ctx; - if (ctx->failed) - return; + ctx->callback = callback; + ctx->context = context; + + if (ctx->failed || ctx->head == NULL) { + callback(ctx->failed ? ctx->error : NULL, context); + driver_pgsql_transaction_unref(ctx); + } else if (ctx->head->next == NULL) { + /* just a single query, send it */ + sql_query(_ctx->db, ctx->head->query, + transaction_commit_callback, ctx); + } else { + /* multiple queries, use a transaction */ + ctx->refcount++; + sql_query(_ctx->db, "BEGIN", transaction_update_callback, ctx); + while (ctx->head != NULL) { + ctx->refcount++; + sql_query(_ctx->db, ctx->head->query, + transaction_update_callback, ctx); + ctx->head = ctx->head->next; + } + sql_query(_ctx->db, "COMMIT", transaction_commit_callback, ctx); + } +} - if (!ctx->opened) { - if (ctx->first_update == NULL) { - /* delay sending the first update in case there is - only one to be sent and we don't need BEGIN/COMMIT */ - ctx->first_update = i_strdup(query); - return; +static int +driver_pgsql_transaction_commit_s(struct sql_transaction_context *_ctx, + const char **error_r) +{ + struct pgsql_transaction_context *ctx = + (struct pgsql_transaction_context *)_ctx; + struct sql_result *result; + + *error_r = NULL; + + if (ctx->failed || ctx->head == NULL) { + /* nothing to be done */ + result = NULL; + } else if (ctx->head->next == NULL) { + /* just a single query, send it */ + result = sql_query_s(_ctx->db, ctx->head->query); + } else { + /* multiple queries, use a transaction */ + ctx->refcount++; + sql_query(_ctx->db, "BEGIN", transaction_update_callback, ctx); + while (ctx->head != NULL) { + ctx->refcount++; + sql_query(_ctx->db, ctx->head->query, + transaction_update_callback, ctx); + ctx->head = ctx->head->next; } - ctx->opened = TRUE; - ctx->refcount += 2; - sql_query(_ctx->db, "BEGIN", transaction_update_callback, ctx); - sql_query(_ctx->db, ctx->first_update, - transaction_update_callback, ctx); - i_free_and_null(ctx->first_update); + if (ctx->refcount > 1) { + /* flush the previous queries */ + (void)driver_pgsql_query_s(_ctx->db, NULL); + } + + if (ctx->begin_failed) { + result = NULL; + } else if (ctx->failed) { + result = sql_query_s(_ctx->db, "ROLLBACK"); + } else { + result = sql_query_s(_ctx->db, "COMMIT"); + } } - ctx->refcount++; - driver_pgsql_query_full(_ctx->db, query, - transaction_update_callback, ctx, FALSE); + if (ctx->failed) + *error_r = ctx->error; + else if (result != NULL) { + if (sql_result_next_row(result) < 0) + *error_r = sql_result_get_error(result); + } + if (result != NULL) + sql_result_free(result); + + i_assert(ctx->refcount == 1); + driver_pgsql_transaction_unref(ctx); + return *error_r == NULL ? 0 : -1; +} + +static void +driver_pgsql_transaction_rollback(struct sql_transaction_context *_ctx) +{ + struct pgsql_transaction_context *ctx = + (struct pgsql_transaction_context *)_ctx; + + i_assert(ctx->refcount == 1); + driver_pgsql_transaction_unref(ctx); +} + +static void +driver_pgsql_update(struct sql_transaction_context *_ctx, const char *query) +{ + struct pgsql_transaction_context *ctx = + (struct pgsql_transaction_context *)_ctx; + struct pgsql_query_list *list; + + list = p_new(ctx->query_pool, struct pgsql_query_list, 1); + list->query = p_strdup(ctx->query_pool, query); + + if (ctx->head == NULL) + ctx->head = list; + else + ctx->tail->next = list; + ctx->tail = list; } struct sql_db driver_pgsql_db = {