Mercurial > dovecot > core-2.2
changeset 19065:3de8de46f4a8
dict: Use the new async APIs for everything.
If the dict backend supports async operations, this means that dict service
can now be configured with client_count>1.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Wed, 02 Sep 2015 17:36:47 +0300 |
parents | d40d7f24ffcf |
children | 0d04ac4d43ca |
files | src/dict/dict-commands.c src/dict/dict-commands.h src/dict/dict-connection.c src/dict/dict-connection.h |
diffstat | 4 files changed, 284 insertions(+), 140 deletions(-) [+] |
line wrap: on
line diff
--- a/src/dict/dict-commands.c Wed Sep 02 17:34:43 2015 +0300 +++ b/src/dict/dict-commands.c Wed Sep 02 17:36:47 2015 +0300 @@ -14,87 +14,159 @@ #define DICT_OUTPUT_OPTIMAL_SIZE 1024 -struct dict_client_cmd { - int cmd; - int (*func)(struct dict_connection *conn, const char *line); +struct dict_cmd_func { + enum dict_protocol_cmd cmd; + int (*func)(struct dict_connection_cmd *cmd, const char *line); +}; + +struct dict_connection_cmd { + const struct dict_cmd_func *cmd; + struct dict_connection *conn; + char *reply; + + struct dict_iterate_context *iter; + enum dict_iterate_flags iter_flags; + + struct dict_connection_transaction *trans; }; -static int cmd_lookup(struct dict_connection *conn, const char *line) +static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd); + +static void dict_connection_cmd_free(struct dict_connection_cmd *cmd) { - const char *reply; - const char *value; - int ret; + if (cmd->iter != NULL) + (void)dict_iterate_deinit(&cmd->iter); + array_delete(&cmd->conn->cmds, 0, 1); + i_free(cmd->reply); - if (conn->iter_ctx != NULL) { - i_error("dict client: LOOKUP: Can't lookup while iterating"); - return -1; - } + dict_connection_continue_input(cmd->conn); + i_free(cmd); +} - /* <key> */ - ret = dict_lookup(conn->dict, pool_datastack_create(), line, &value); - if (ret > 0) { - reply = t_strdup_printf("%c%s\n", - DICT_PROTOCOL_REPLY_OK, str_tabescape(value)); - o_stream_nsend_str(conn->output, reply); - } else { - reply = t_strdup_printf("%c\n", ret == 0 ? - DICT_PROTOCOL_REPLY_NOTFOUND : - DICT_PROTOCOL_REPLY_FAIL); - o_stream_nsend_str(conn->output, reply); +static void dict_connection_cmd_remove(struct dict_connection_cmd *cmd) +{ + struct dict_connection_cmd *const *cmds; + unsigned int i, count; + + cmds = array_get(&cmd->conn->cmds, &count); + for (i = 0; i < count; i++) { + if (cmds[i] == cmd) { + dict_connection_cmd_free(cmd); + return; + } } - return 0; + i_unreached(); } -static int cmd_iterate_flush(struct dict_connection *conn) +static void dict_connection_cmds_flush(struct dict_connection *conn) +{ + struct dict_connection_cmd *cmd, *const *first_cmdp; + + while (array_count(&conn->cmds) > 0) { + first_cmdp = array_idx(&conn->cmds, 0); + cmd = *first_cmdp; + + if (cmd->reply == NULL) { + /* command not finished yet */ + break; + } + + o_stream_nsend_str(conn->output, cmd->reply); + dict_connection_cmd_remove(cmd); + } +} + +void dict_connection_cmds_free(struct dict_connection *conn) +{ + struct dict_connection_cmd *const *first_cmdp; + + while (array_count(&conn->cmds) > 0) { + first_cmdp = array_idx(&conn->cmds, 0); + dict_connection_cmd_remove(*first_cmdp); + } +} + +static void +cmd_lookup_callback(const struct dict_lookup_result *result, void *context) +{ + struct dict_connection_cmd *cmd = context; + + if (result->ret > 0) { + cmd->reply = i_strdup_printf("%c%s\n", + DICT_PROTOCOL_REPLY_OK, str_tabescape(result->value)); + } else if (result->ret == 0) { + cmd->reply = i_strdup_printf("%c\n", DICT_PROTOCOL_REPLY_NOTFOUND); + } else { + i_error("%s", result->error); + cmd->reply = i_strdup_printf("%c\n", DICT_PROTOCOL_REPLY_FAIL); + } + dict_connection_cmds_flush(cmd->conn); +} + +static int cmd_lookup(struct dict_connection_cmd *cmd, const char *line) +{ + /* <key> */ + dict_lookup_async(cmd->conn->dict, line, cmd_lookup_callback, cmd); + return 1; +} + +static int cmd_iterate_flush(struct dict_connection_cmd *cmd) { string_t *str; const char *key, *value; str = t_str_new(256); - o_stream_cork(conn->output); - while (dict_iterate(conn->iter_ctx, &key, &value)) { + o_stream_cork(cmd->conn->output); + while (dict_iterate(cmd->iter, &key, &value)) { str_truncate(str, 0); str_append_c(str, DICT_PROTOCOL_REPLY_OK); str_append_tabescaped(str, key); str_append_c(str, '\t'); - if ((conn->iter_flags & DICT_ITERATE_FLAG_NO_VALUE) == 0) + if ((cmd->iter_flags & DICT_ITERATE_FLAG_NO_VALUE) == 0) str_append_tabescaped(str, value); str_append_c(str, '\n'); - o_stream_nsend(conn->output, str_data(str), str_len(str)); + o_stream_nsend(cmd->conn->output, str_data(str), str_len(str)); - if (o_stream_get_buffer_used_size(conn->output) > + if (o_stream_get_buffer_used_size(cmd->conn->output) > DICT_OUTPUT_OPTIMAL_SIZE) { - if (o_stream_flush(conn->output) <= 0) { - /* continue later */ - o_stream_uncork(conn->output); + if (o_stream_flush(cmd->conn->output) <= 0) { + /* continue later when there's more space + in output buffer */ + o_stream_uncork(cmd->conn->output); + o_stream_set_flush_pending(cmd->conn->output, TRUE); return 0; } /* flushed everything, continue */ } } - - /* finished iterating */ - o_stream_unset_flush_callback(conn->output); + if (dict_iterate_has_more(cmd->iter)) { + /* wait for the next iteration callback */ + return 0; + } str_truncate(str, 0); - if (dict_iterate_deinit(&conn->iter_ctx) < 0) + if (dict_iterate_deinit(&cmd->iter) < 0) str_append_c(str, DICT_PROTOCOL_REPLY_FAIL); str_append_c(str, '\n'); - o_stream_nsend(conn->output, str_data(str), str_len(str)); - o_stream_uncork(conn->output); + o_stream_uncork(cmd->conn->output); + + cmd->reply = i_strdup(str_c(str)); + dict_connection_cmds_flush(cmd->conn); return 1; } -static int cmd_iterate(struct dict_connection *conn, const char *line) +static void cmd_iterate_callback(void *context) +{ + struct dict_connection_cmd *cmd = context; + + dict_connection_cmd_output_more(cmd); +} + +static int cmd_iterate(struct dict_connection_cmd *cmd, const char *line) { const char *const *args; unsigned int flags; - if (conn->iter_ctx != NULL) { - i_error("dict client: ITERATE: Already iterating"); - return -1; - } - args = t_strsplit_tabescaped(line); if (str_array_length(args) < 2 || str_to_uint(args[0], &flags) < 0) { @@ -103,12 +175,12 @@ } /* <flags> <path> */ - conn->iter_ctx = dict_iterate_init_multiple(conn->dict, args+1, flags); - conn->iter_flags = flags; - - o_stream_set_flush_callback(conn->output, cmd_iterate_flush, conn); - (void)cmd_iterate_flush(conn); - return 0; + flags |= DICT_ITERATE_FLAG_ASYNC; + cmd->iter = dict_iterate_init_multiple(cmd->conn->dict, args+1, flags); + cmd->iter_flags = flags; + dict_iterate_set_async_callback(cmd->iter, cmd_iterate_callback, cmd); + dict_connection_cmd_output_more(cmd); + return 1; } static struct dict_connection_transaction * @@ -134,6 +206,8 @@ const struct dict_connection_transaction *transactions; unsigned int i, count; + i_assert(trans->ctx == NULL); + transactions = array_get(&conn->transactions, &count); for (i = 0; i < count; i++) { if (&transactions[i] == trans) { @@ -143,7 +217,7 @@ } } -static int cmd_begin(struct dict_connection *conn, const char *line) +static int cmd_begin(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; unsigned int id; @@ -152,19 +226,19 @@ i_error("dict client: Invalid transaction ID %s", line); return -1; } - if (dict_connection_transaction_lookup(conn, id) != NULL) { + if (dict_connection_transaction_lookup(cmd->conn, id) != NULL) { i_error("dict client: Transaction ID %u already exists", id); return -1; } - if (!array_is_created(&conn->transactions)) - i_array_init(&conn->transactions, 4); + if (!array_is_created(&cmd->conn->transactions)) + i_array_init(&cmd->conn->transactions, 4); /* <id> */ - trans = array_append_space(&conn->transactions); + trans = array_append_space(&cmd->conn->transactions); trans->id = id; - trans->conn = conn; - trans->ctx = dict_transaction_begin(conn->dict); + trans->conn = cmd->conn; + trans->ctx = dict_transaction_begin(cmd->conn->dict); return 0; } @@ -187,41 +261,9 @@ return 0; } -static int cmd_commit(struct dict_connection *conn, const char *line) +static void +cmd_commit_finish(struct dict_connection_cmd *cmd, int ret, bool async) { - struct dict_connection_transaction *trans; - char chr; - int ret; - - if (conn->iter_ctx != NULL) { - i_error("dict client: COMMIT: Can't commit while iterating"); - return -1; - } - - if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0) - return -1; - - ret = dict_transaction_commit(&trans->ctx); - switch (ret) { - case 1: - chr = DICT_PROTOCOL_REPLY_OK; - break; - case 0: - chr = DICT_PROTOCOL_REPLY_NOTFOUND; - break; - default: - chr = DICT_PROTOCOL_REPLY_FAIL; - break; - } - o_stream_nsend_str(conn->output, t_strdup_printf("%c\n", chr)); - dict_connection_transaction_array_remove(conn, trans); - return 0; -} - -static void cmd_commit_async_callback(int ret, void *context) -{ - struct dict_connection_transaction *trans = context; - const char *reply; char chr; switch (ret) { @@ -235,44 +277,63 @@ chr = DICT_PROTOCOL_REPLY_FAIL; break; } - reply = t_strdup_printf("%c%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_COMMIT, - chr, trans->id); - o_stream_nsend_str(trans->conn->output, reply); + if (async) { + cmd->reply = i_strdup_printf("%c%c%u\n", + DICT_PROTOCOL_REPLY_ASYNC_COMMIT, chr, cmd->trans->id); + } else { + cmd->reply = i_strdup_printf("%c%u\n", chr, cmd->trans->id); + } + dict_connection_transaction_array_remove(cmd->conn, cmd->trans); + dict_connection_cmds_flush(cmd->conn); +} - dict_connection_transaction_array_remove(trans->conn, trans); +static void cmd_commit_callback(int ret, void *context) +{ + struct dict_connection_cmd *cmd = context; + + cmd_commit_finish(cmd, ret, FALSE); +} + +static void cmd_commit_callback_async(int ret, void *context) +{ + struct dict_connection_cmd *cmd = context; + + cmd_commit_finish(cmd, ret, TRUE); } static int -cmd_commit_async(struct dict_connection *conn, const char *line) +cmd_commit(struct dict_connection_cmd *cmd, const char *line) +{ + if (dict_connection_transaction_lookup_parse(cmd->conn, line, &cmd->trans) < 0) + return -1; + + dict_transaction_commit_async(&cmd->trans->ctx, cmd_commit_callback, cmd); + return 1; +} + +static int +cmd_commit_async(struct dict_connection_cmd *cmd, const char *line) +{ + if (dict_connection_transaction_lookup_parse(cmd->conn, line, &cmd->trans) < 0) + return -1; + + dict_transaction_commit_async(&cmd->trans->ctx, cmd_commit_callback_async, cmd); + return 1; +} + +static int cmd_rollback(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; - if (conn->iter_ctx != NULL) { - i_error("dict client: COMMIT: Can't commit while iterating"); - return -1; - } - - if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0) + if (dict_connection_transaction_lookup_parse(cmd->conn, line, &trans) < 0) return -1; - dict_transaction_commit_async(&trans->ctx, cmd_commit_async_callback, - trans); + dict_transaction_rollback(&trans->ctx); + dict_connection_transaction_array_remove(cmd->conn, trans); return 0; } -static int cmd_rollback(struct dict_connection *conn, const char *line) -{ - struct dict_connection_transaction *trans; - - if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0) - return -1; - - dict_transaction_rollback(&trans->ctx); - dict_connection_transaction_array_remove(conn, trans); - return 0; -} - -static int cmd_set(struct dict_connection *conn, const char *line) +static int cmd_set(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; @@ -284,14 +345,13 @@ return -1; } - if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0) + if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; - dict_set(trans->ctx, args[1], args[2]); return 0; } -static int cmd_unset(struct dict_connection *conn, const char *line) +static int cmd_unset(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; @@ -303,14 +363,13 @@ return -1; } - if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0) + if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; - dict_unset(trans->ctx, args[1]); return 0; } -static int cmd_append(struct dict_connection *conn, const char *line) +static int cmd_append(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; @@ -322,14 +381,14 @@ return -1; } - if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0) + if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; dict_append(trans->ctx, args[1], args[2]); return 0; } -static int cmd_atomic_inc(struct dict_connection *conn, const char *line) +static int cmd_atomic_inc(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; @@ -343,14 +402,14 @@ return -1; } - if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0) + if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; dict_atomic_inc(trans->ctx, args[1], diff); return 0; } -static struct dict_client_cmd cmds[] = { +static const struct dict_cmd_func cmds[] = { { DICT_PROTOCOL_CMD_LOOKUP, cmd_lookup }, { DICT_PROTOCOL_CMD_ITERATE, cmd_iterate }, { DICT_PROTOCOL_CMD_BEGIN, cmd_begin }, @@ -365,7 +424,7 @@ { 0, NULL } }; -static struct dict_client_cmd *dict_command_find(char cmd) +static const struct dict_cmd_func *dict_command_find(enum dict_protocol_cmd cmd) { unsigned int i; @@ -378,13 +437,52 @@ int dict_command_input(struct dict_connection *conn, const char *line) { - struct dict_client_cmd *cmd; + const struct dict_cmd_func *cmd_func; + struct dict_connection_cmd *cmd; + int ret; - cmd = dict_command_find(*line); - if (cmd == NULL) { + cmd_func = dict_command_find((enum dict_protocol_cmd)*line); + if (cmd_func == NULL) { i_error("dict client: Unknown command %c", *line); return -1; } - return cmd->func(conn, line + 1); + cmd = i_new(struct dict_connection_cmd, 1); + cmd->conn = conn; + cmd->cmd = cmd_func; + array_append(&conn->cmds, &cmd, 1); + if ((ret = cmd_func->func(cmd, line + 1)) <= 0) { + dict_connection_cmd_remove(cmd); + return ret; + } + return 0; +} + +static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd) +{ + struct dict_connection_cmd *const *first_cmdp; + + first_cmdp = array_idx(&cmd->conn->cmds, 0); + if (*first_cmdp == cmd) + (void)cmd_iterate_flush(cmd); } + +void dict_connection_cmds_output_more(struct dict_connection *conn) +{ + struct dict_connection_cmd *cmd, *const *first_cmdp; + + /* only iterators may be returning a lot of data */ + while (array_count(&conn->cmds) > 0) { + first_cmdp = array_idx(&conn->cmds, 0); + cmd = *first_cmdp; + + if (cmd->iter == NULL) + break; + + if (cmd_iterate_flush(cmd) == 0) { + /* unfinished */ + break; + } + /* cmd should be freed now */ + } +}
--- a/src/dict/dict-commands.h Wed Sep 02 17:34:43 2015 +0300 +++ b/src/dict/dict-commands.h Wed Sep 02 17:36:47 2015 +0300 @@ -5,4 +5,7 @@ int dict_command_input(struct dict_connection *conn, const char *line); +void dict_connection_cmds_output_more(struct dict_connection *conn); +void dict_connection_cmds_free(struct dict_connection *conn); + #endif
--- a/src/dict/dict-connection.c Wed Sep 02 17:34:43 2015 +0300 +++ b/src/dict/dict-connection.c Wed Sep 02 17:36:47 2015 +0300 @@ -15,6 +15,8 @@ #include <stdlib.h> #include <unistd.h> +#define DICT_CONN_MAX_PENDING_COMMANDS 5 + static struct dict_connection *dict_connections; static int dict_connection_parse_handshake(struct dict_connection *conn, @@ -103,6 +105,9 @@ const char *line; int ret; + if (conn->to_input != NULL) + timeout_remove(&conn->to_input); + switch (i_stream_read(conn->input)) { case 0: return; @@ -142,9 +147,37 @@ dict_connection_destroy(conn); break; } + if (array_count(&conn->cmds) >= DICT_CONN_MAX_PENDING_COMMANDS) { + io_remove(&conn->io); + if (conn->to_input != NULL) + timeout_remove(&conn->to_input); + } } } +void dict_connection_continue_input(struct dict_connection *conn) +{ + if (conn->io != NULL) + return; + + conn->io = io_add(conn->fd, IO_READ, dict_connection_input, conn); + if (conn->to_input == NULL) + conn->to_input = timeout_add_short(0, dict_connection_input, conn); +} + +static int dict_connection_output(struct dict_connection *conn) +{ + int ret; + + if ((ret = o_stream_flush(conn->output)) < 0) { + dict_connection_destroy(conn); + return 1; + } + if (ret > 0) + dict_connection_cmds_output_more(conn); + return ret; +} + struct dict_connection *dict_connection_create(int fd) { struct dict_connection *conn; @@ -155,7 +188,9 @@ FALSE); conn->output = o_stream_create_fd(fd, 128*1024, FALSE); o_stream_set_no_error_handling(conn->output, TRUE); + o_stream_set_flush_callback(conn->output, dict_connection_output, conn); conn->io = io_add(fd, IO_READ, dict_connection_input, conn); + i_array_init(&conn->cmds, DICT_CONN_MAX_PENDING_COMMANDS); DLLIST_PREPEND(&dict_connections, conn); return conn; } @@ -166,23 +201,30 @@ DLLIST_REMOVE(&dict_connections, conn); + /* deinit dict before anything else, so any pending dict operations + are aborted and their callbacks called. */ + if (conn->dict != NULL) + dict_deinit(&conn->dict); + if (array_is_created(&conn->transactions)) { array_foreach_modifiable(&conn->transactions, transaction) dict_transaction_rollback(&transaction->ctx); array_free(&conn->transactions); } - if (conn->iter_ctx != NULL) - (void)dict_iterate_deinit(&conn->iter_ctx); + /* this may end up adding conn->io back, so keep this early */ + dict_connection_cmds_free(conn); + array_free(&conn->cmds); - io_remove(&conn->io); + if (conn->to_input != NULL) + timeout_remove(&conn->to_input); + if (conn->io != NULL) + io_remove(&conn->io); i_stream_destroy(&conn->input); o_stream_destroy(&conn->output); if (close(conn->fd) < 0) i_error("close(dict client) failed: %m"); - if (conn->dict != NULL) - dict_deinit(&conn->dict); i_free(conn->name); i_free(conn->username); i_free(conn);
--- a/src/dict/dict-connection.h Wed Sep 02 17:34:43 2015 +0300 +++ b/src/dict/dict-connection.h Wed Sep 02 17:36:47 2015 +0300 @@ -22,18 +22,19 @@ struct io *io; struct istream *input; struct ostream *output; - - struct dict_iterate_context *iter_ctx; - enum dict_iterate_flags iter_flags; + struct timeout *to_input; /* There are only a few transactions per client, so keeping them in array is fast enough */ ARRAY(struct dict_connection_transaction) transactions; + ARRAY(struct dict_connection_cmd *) cmds; }; struct dict_connection *dict_connection_create(int fd); void dict_connection_destroy(struct dict_connection *conn); +void dict_connection_continue_input(struct dict_connection *conn); + void dict_connections_destroy_all(void); #endif