Mercurial > dovecot > core-2.2
changeset 19066:0d04ac4d43ca
dict-sql: Added support for async operations.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Wed, 02 Sep 2015 17:37:16 +0300 |
parents | 3de8de46f4a8 |
children | 7378ab8e3b4d |
files | src/lib-dict/dict-sql.c |
diffstat | 1 files changed, 133 insertions(+), 23 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lib-dict/dict-sql.c Wed Sep 02 17:36:47 2015 +0300 +++ b/src/lib-dict/dict-sql.c Wed Sep 02 17:37:16 2015 +0300 @@ -63,6 +63,9 @@ pool_t inc_row_pool; struct sql_dict_inc_row *inc_row; + dict_transaction_commit_callback_t *async_callback; + void *async_context; + unsigned int failed:1; }; @@ -106,6 +109,12 @@ pool_unref(&dict->pool); } +static int sql_dict_wait(struct dict *dict ATTR_UNUSED) +{ + /* FIXME: lib-sql doesn't support this yet */ + return 0; +} + static bool dict_sql_map_match(const struct dict_sql_map *map, const char *path, ARRAY_TYPE(const_string) *values, unsigned int *pat_len_r, @@ -266,32 +275,44 @@ } } -static int sql_dict_lookup(struct dict *_dict, pool_t pool, - const char *key, const char **value_r) +static int sql_lookup_get_query(struct sql_dict *dict, const char *key, + string_t *query) { - struct sql_dict *dict = (struct sql_dict *)_dict; const struct dict_sql_map *map; ARRAY_TYPE(const_string) values; - struct sql_result *result; - int ret; map = sql_dict_find_map(dict, key, &values); if (map == NULL) { i_error("sql dict lookup: Invalid/unmapped key: %s", key); - *value_r = NULL; - return 0; + return -1; } + str_printfa(query, "SELECT %s FROM %s", + map->value_field, map->table); + sql_dict_where_build(dict, map, &values, key[0], + SQL_DICT_RECURSE_NONE, query); + return 0; +} + +static int sql_dict_lookup(struct dict *_dict, pool_t pool, + const char *key, const char **value_r) +{ + struct sql_dict *dict = (struct sql_dict *)_dict; + struct sql_result *result = NULL; + int ret; T_BEGIN { string_t *query = t_str_new(256); - str_printfa(query, "SELECT %s FROM %s", - map->value_field, map->table); - sql_dict_where_build(dict, map, &values, key[0], - SQL_DICT_RECURSE_NONE, query); - result = sql_query_s(dict->db, str_c(query)); + ret = sql_lookup_get_query(dict, key, query); + if (ret == 0) + result = sql_query_s(dict->db, str_c(query)); } T_END; + if (ret < 0) { + *value_r = NULL; + return -1; + } + ret = sql_result_next_row(result); if (ret <= 0) { if (ret < 0) { @@ -308,6 +329,48 @@ return ret; } +struct sql_dict_lookup_context { + dict_lookup_callback_t *callback; + void *context; +}; + +static void +sql_dict_lookup_async_callback(struct sql_result *sql_result, + struct sql_dict_lookup_context *ctx) +{ + struct dict_lookup_result result; + + memset(&result, 0, sizeof(result)); + result.ret = sql_result_next_row(sql_result); + if (result.ret < 0) + result.error = sql_result_get_error(sql_result); + else if (result.ret > 0) + result.value = sql_result_get_field_value(sql_result, 0); + ctx->callback(&result, ctx->context); + + i_free(ctx); +} + +static void +sql_dict_lookup_async(struct dict *_dict, const char *key, + dict_lookup_callback_t *callback, void *context) +{ + struct sql_dict *dict = (struct sql_dict *)_dict; + struct sql_dict_lookup_context *ctx; + + T_BEGIN { + string_t *query = t_str_new(256); + + if (sql_lookup_get_query(dict, key, query) == 0) { + ctx = i_new(struct sql_dict_lookup_context, 1); + ctx->callback = callback; + ctx->context = context; + sql_query(dict->db, str_c(query), + sql_dict_lookup_async_callback, ctx); + } + } T_END; +} + static const struct dict_sql_map * sql_dict_iterate_find_next_map(struct sql_dict_iterate_context *ctx, ARRAY_TYPE(const_string) *values) @@ -356,8 +419,10 @@ if (map == NULL) return FALSE; - if (ctx->result != NULL) + if (ctx->result != NULL) { sql_result_unref(ctx->result); + ctx->result = NULL; + } str_append(query, "SELECT "); if ((ctx->flags & DICT_ITERATE_FLAG_NO_VALUE) == 0) @@ -402,6 +467,15 @@ return TRUE; } +static void sql_dict_iterate_callback(struct sql_result *result, + struct sql_dict_iterate_context *ctx) +{ + sql_result_ref(result); + ctx->result = result; + if (ctx->ctx.async_callback != NULL) + ctx->ctx.async_callback(ctx->ctx.async_context); +} + static bool sql_dict_iterate_next_query(struct sql_dict_iterate_context *ctx) { struct sql_dict *dict = (struct sql_dict *)ctx->ctx.dict; @@ -413,14 +487,17 @@ ret = sql_dict_iterate_build_next_query(ctx, query); if (!ret) { /* failed */ + } else if ((ctx->flags & DICT_ITERATE_FLAG_ASYNC) == 0) { + ctx->result = sql_query_s(dict->db, str_c(query)); } else { - ctx->result = sql_query_s(dict->db, str_c(query)); + i_assert(ctx->result == NULL); + sql_query(dict->db, str_c(query), + sql_dict_iterate_callback, ctx); } } T_END; return ret; } - static struct dict_iterate_context * sql_dict_iterate_init(struct dict *_dict, const char *const *paths, enum dict_iterate_flags flags) @@ -445,6 +522,7 @@ i_error("sql dict iterate: Invalid/unmapped path: %s", paths[0]); ctx->result = NULL; + ctx->failed = TRUE; return &ctx->ctx; } return &ctx->ctx; @@ -459,12 +537,21 @@ unsigned int i, count; int ret; - if (ctx->result == NULL) { - ctx->failed = TRUE; + _ctx->has_more = FALSE; + if (ctx->failed) return FALSE; - } - while ((ret = sql_result_next_row(ctx->result)) == 0) { + for (;;) { + if (ctx->result == NULL) { + /* wait for async lookup to finish */ + i_assert((ctx->flags & DICT_ITERATE_FLAG_ASYNC) != 0); + _ctx->has_more = TRUE; + return FALSE; + } + + ret = sql_result_next_row(ctx->result); + if (ret != 0) + break; /* see if there are more results in the next map. don't do it if we're looking for an exact match, since we already should have handled it. */ @@ -550,9 +637,26 @@ return FALSE; } +static void +sql_dict_transaction_commit_callback(const char *error, + struct sql_dict_transaction_context *ctx) +{ + int ret; + + if (error == NULL) + ret = sql_dict_transaction_has_nonexistent(ctx) ? 0 : 1; + else { + i_error("sql dict: commit failed: %s", error); + ret = -1; + } + + if (ctx->async_callback != NULL) + ctx->async_callback(ret, ctx->async_context); + sql_dict_transaction_free(ctx); +} + static int -sql_dict_transaction_commit(struct dict_transaction_context *_ctx, - bool async ATTR_UNUSED, +sql_dict_transaction_commit(struct dict_transaction_context *_ctx, bool async, dict_transaction_commit_callback_t *callback, void *context) { @@ -570,6 +674,12 @@ } else if (!_ctx->changed) { /* nothing changed, no need to commit */ sql_transaction_rollback(&ctx->sql_ctx); + } else if (async) { + ctx->async_callback = callback; + ctx->async_context = context; + sql_transaction_commit(&ctx->sql_ctx, + sql_dict_transaction_commit_callback, ctx); + return 1; } else { if (sql_transaction_commit_s(&ctx->sql_ctx, &error) < 0) { i_error("sql dict: commit failed: %s", error); @@ -935,7 +1045,7 @@ { sql_dict_init, sql_dict_deinit, - NULL, + sql_dict_wait, sql_dict_lookup, sql_dict_iterate_init, sql_dict_iterate, @@ -947,7 +1057,7 @@ sql_dict_unset, sql_dict_append, sql_dict_atomic_inc, - NULL + sql_dict_lookup_async } };