Mercurial > dovecot > core-2.2
view src/dict/dict-commands.c @ 22616:629f44740f50
cassandra: Include "prepared" when logging about prepared statement queries
Mainly useful for debugging/testing.
author | Timo Sirainen <timo.sirainen@dovecot.fi> |
---|---|
date | Mon, 16 Oct 2017 15:12:12 +0300 |
parents | 6fc89fa8bd23 |
children | cb108f786fb4 |
line wrap: on
line source
/* Copyright (c) 2005-2017 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "array.h" #include "ostream.h" #include "str.h" #include "strescape.h" #include "timing.h" #include "time-util.h" #include "dict-client.h" #include "dict-settings.h" #include "dict-connection.h" #include "dict-commands.h" #include "main.h" #define DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION 1 #define DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION 1 #define DICT_OUTPUT_OPTIMAL_SIZE 1024 struct dict_cmd_func { enum dict_protocol_cmd cmd; int (*func)(struct dict_connection_cmd *cmd, const char *line); }; struct dict_connection_cmd { const struct dict_cmd_func *cmd; struct dict_connection *conn; struct timeval start_timeval; char *reply; struct dict_iterate_context *iter; enum dict_iterate_flags iter_flags; unsigned int async_reply_id; unsigned int trans_id; /* obsolete */ }; struct dict_command_stats cmd_stats; static int cmd_iterate_flush(struct dict_connection_cmd *cmd); static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd); static void dict_connection_cmd_free(struct dict_connection_cmd *cmd) { if (cmd->iter != NULL) (void)dict_iterate_deinit(&cmd->iter); i_free(cmd->reply); if (dict_connection_unref(cmd->conn)) dict_connection_continue_input(cmd->conn); i_free(cmd); } static void dict_connection_cmd_remove(struct dict_connection_cmd *cmd) { struct dict_connection_cmd *const *cmds; unsigned int i, count; cmds = array_get(&cmd->conn->cmds, &count); for (i = 0; i < count; i++) { if (cmds[i] == cmd) { array_delete(&cmd->conn->cmds, i, 1); dict_connection_cmd_free(cmd); return; } } i_unreached(); } static void dict_connection_cmds_flush(struct dict_connection *conn) { struct dict_connection_cmd *cmd, *const *first_cmdp; i_assert(conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION); dict_connection_ref(conn); while (array_count(&conn->cmds) > 0) { first_cmdp = array_idx(&conn->cmds, 0); cmd = *first_cmdp; i_assert(cmd->async_reply_id == 0); /* we may be able to start outputting iterations now. */ if (cmd->iter != NULL) (void)cmd_iterate_flush(cmd); if (cmd->reply == NULL) { /* command not finished yet */ break; } o_stream_nsend_str(conn->output, cmd->reply); dict_connection_cmd_remove(cmd); } dict_connection_unref_safe(conn); } static void dict_connection_cmd_try_flush(struct dict_connection_cmd **_cmd) { struct dict_connection_cmd *cmd = *_cmd; *_cmd = NULL; if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION) { dict_connection_cmds_flush(cmd->conn); return; } i_assert(cmd->async_reply_id != 0); i_assert(cmd->reply != NULL); o_stream_nsend_str(cmd->conn->output, t_strdup_printf("%c%u\t%s", DICT_PROTOCOL_REPLY_ASYNC_REPLY, cmd->async_reply_id, cmd->reply)); dict_connection_cmd_remove(cmd); } static void dict_connection_cmd_async(struct dict_connection_cmd *cmd) { if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION) return; i_assert(cmd->async_reply_id == 0); cmd->async_reply_id = ++cmd->conn->async_id_counter; if (cmd->async_reply_id == 0) cmd->async_reply_id = ++cmd->conn->async_id_counter; o_stream_nsend_str(cmd->conn->output, t_strdup_printf("%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_ID, cmd->async_reply_id)); } static void cmd_stats_update(struct dict_connection_cmd *cmd, struct timing *timing) { long long diff; if (!dict_settings->verbose_proctitle) return; diff = timeval_diff_usecs(&ioloop_timeval, &cmd->start_timeval); if (diff < 0) diff = 0; timing_add_usecs(timing, diff); dict_proctitle_update_later(); } static void dict_cmd_reply_handle_timings(struct dict_connection_cmd *cmd, string_t *str, struct timing *timing) { io_loop_time_refresh(); cmd_stats_update(cmd, timing); if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION) return; str_printfa(str, "\t%ld\t%u\t%ld\t%u", (long)cmd->start_timeval.tv_sec, (unsigned int)cmd->start_timeval.tv_usec, (long)ioloop_timeval.tv_sec, (unsigned int)ioloop_timeval.tv_usec); } static void cmd_lookup_write_reply(struct dict_connection_cmd *cmd, const char *const *values, string_t *str) { string_t *tmp; i_assert(values[0] != NULL); if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_VERSION_MIN_MULTI_OK || values[1] == NULL) { str_append_c(str, DICT_PROTOCOL_REPLY_OK); str_append_tabescaped(str, values[0]); return; } /* the results get double-tabescaped so they end up becoming a single parameter */ tmp = t_str_new(128); for (unsigned int i = 0; values[i] != NULL; i++) { str_append_c(tmp, '\t'); str_append_tabescaped(tmp, values[i]); } str_append_c(str, DICT_PROTOCOL_REPLY_MULTI_OK); str_append_tabescaped(str, str_c(tmp) + 1); } static void cmd_lookup_callback(const struct dict_lookup_result *result, void *context) { struct dict_connection_cmd *cmd = context; string_t *str = t_str_new(128); if (result->ret > 0) { cmd_lookup_write_reply(cmd, result->values, str); } else if (result->ret == 0) { str_append_c(str, DICT_PROTOCOL_REPLY_NOTFOUND); } else { i_error("%s", result->error); str_append_c(str, DICT_PROTOCOL_REPLY_FAIL); str_append_tabescaped(str, result->error); } dict_cmd_reply_handle_timings(cmd, str, cmd_stats.lookups); str_append_c(str, '\n'); cmd->reply = i_strdup(str_c(str)); dict_connection_cmd_try_flush(&cmd); } static int cmd_lookup(struct dict_connection_cmd *cmd, const char *line) { /* <key> */ dict_connection_cmd_async(cmd); dict_lookup_async(cmd->conn->dict, line, cmd_lookup_callback, cmd); return 1; } static bool dict_connection_flush_if_full(struct dict_connection *conn) { if (o_stream_get_buffer_used_size(conn->output) > DICT_OUTPUT_OPTIMAL_SIZE) { if (o_stream_flush(conn->output) <= 0) { /* continue later when there's more space in output buffer */ o_stream_set_flush_pending(conn->output, TRUE); return FALSE; } /* flushed everything, continue */ } return TRUE; } static int cmd_iterate_flush(struct dict_connection_cmd *cmd) { string_t *str; const char *key, *value; if (!dict_connection_flush_if_full(cmd->conn)) return 0; str = t_str_new(256); while (dict_iterate(cmd->iter, &key, &value)) { str_truncate(str, 0); if (cmd->async_reply_id != 0) { str_append_c(str, DICT_PROTOCOL_REPLY_ASYNC_REPLY); str_printfa(str, "%u\t", cmd->async_reply_id); } str_append_c(str, DICT_PROTOCOL_REPLY_OK); str_append_tabescaped(str, key); str_append_c(str, '\t'); if ((cmd->iter_flags & DICT_ITERATE_FLAG_NO_VALUE) == 0) str_append_tabescaped(str, value); str_append_c(str, '\n'); o_stream_nsend(cmd->conn->output, str_data(str), str_len(str)); if (!dict_connection_flush_if_full(cmd->conn)) return 0; } if (dict_iterate_has_more(cmd->iter)) { /* wait for the next iteration callback */ return 0; } str_truncate(str, 0); if (dict_iterate_deinit(&cmd->iter) < 0) str_append_c(str, DICT_PROTOCOL_REPLY_FAIL); dict_cmd_reply_handle_timings(cmd, str, cmd_stats.iterations); str_append_c(str, '\n'); cmd->reply = i_strdup(str_c(str)); return 1; } static void cmd_iterate_callback(void *context) { struct dict_connection_cmd *cmd = context; struct dict_connection *conn = cmd->conn; dict_connection_ref(conn); o_stream_cork(conn->output); dict_connection_cmd_output_more(cmd); o_stream_uncork(conn->output); dict_connection_unref_safe(conn); } static int cmd_iterate(struct dict_connection_cmd *cmd, const char *line) { const char *const *args; unsigned int flags; uint64_t max_rows; args = t_strsplit_tabescaped(line); if (str_array_length(args) < 3 || str_to_uint(args[0], &flags) < 0 || str_to_uint64(args[1], &max_rows) < 0) { i_error("dict client: ITERATE: broken input"); return -1; } dict_connection_cmd_async(cmd); /* <flags> <max_rows> <path> */ flags |= DICT_ITERATE_FLAG_ASYNC; cmd->iter = dict_iterate_init_multiple(cmd->conn->dict, args+2, flags); cmd->iter_flags = flags; if (max_rows > 0) dict_iterate_set_limit(cmd->iter, max_rows); dict_iterate_set_async_callback(cmd->iter, cmd_iterate_callback, cmd); dict_connection_cmd_output_more(cmd); return 1; } static struct dict_connection_transaction * dict_connection_transaction_lookup(struct dict_connection *conn, unsigned int id) { struct dict_connection_transaction *transaction; if (!array_is_created(&conn->transactions)) return NULL; array_foreach_modifiable(&conn->transactions, transaction) { if (transaction->id == id) return transaction; } return NULL; } static void dict_connection_transaction_array_remove(struct dict_connection *conn, unsigned int id) { const struct dict_connection_transaction *transactions; unsigned int i, count; transactions = array_get(&conn->transactions, &count); for (i = 0; i < count; i++) { if (transactions[i].id == id) { i_assert(transactions[i].ctx == NULL); array_delete(&conn->transactions, i, 1); return; } } i_unreached(); } static int cmd_begin(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; unsigned int id; if (str_to_uint(line, &id) < 0) { i_error("dict client: Invalid transaction ID %s", line); return -1; } if (dict_connection_transaction_lookup(cmd->conn, id) != NULL) { i_error("dict client: Transaction ID %u already exists", id); return -1; } if (!array_is_created(&cmd->conn->transactions)) i_array_init(&cmd->conn->transactions, 4); /* <id> */ trans = array_append_space(&cmd->conn->transactions); trans->id = id; trans->conn = cmd->conn; trans->ctx = dict_transaction_begin(cmd->conn->dict); return 0; } static int dict_connection_transaction_lookup_parse(struct dict_connection *conn, const char *line, struct dict_connection_transaction **trans_r) { unsigned int id; if (str_to_uint(line, &id) < 0) { i_error("dict client: Invalid transaction ID %s", line); return -1; } *trans_r = dict_connection_transaction_lookup(conn, id); if (*trans_r == NULL) { i_error("dict client: Transaction ID %u doesn't exist", id); return -1; } return 0; } static void cmd_commit_finish(struct dict_connection_cmd *cmd, int ret, bool async) { string_t *str = t_str_new(64); char chr; switch (ret) { case 1: chr = DICT_PROTOCOL_REPLY_OK; break; case 0: chr = DICT_PROTOCOL_REPLY_NOTFOUND; break; case DICT_COMMIT_RET_WRITE_UNCERTAIN: chr = DICT_PROTOCOL_REPLY_WRITE_UNCERTAIN; break; case DICT_COMMIT_RET_FAILED: default: chr = DICT_PROTOCOL_REPLY_FAIL; break; } if (async) { str_printfa(str, "%c%c%u", DICT_PROTOCOL_REPLY_ASYNC_COMMIT, chr, cmd->trans_id); } else { str_printfa(str, "%c%u", chr, cmd->trans_id); } dict_cmd_reply_handle_timings(cmd, str, cmd_stats.commits); str_append_c(str, '\n'); cmd->reply = i_strdup(str_c(str)); dict_connection_transaction_array_remove(cmd->conn, cmd->trans_id); dict_connection_cmd_try_flush(&cmd); } static void cmd_commit_callback(int ret, void *context) { struct dict_connection_cmd *cmd = context; cmd_commit_finish(cmd, ret, FALSE); } static void cmd_commit_callback_async(int ret, void *context) { struct dict_connection_cmd *cmd = context; cmd_commit_finish(cmd, ret, TRUE); } static int cmd_commit(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; if (dict_connection_transaction_lookup_parse(cmd->conn, line, &trans) < 0) return -1; cmd->trans_id = trans->id; dict_connection_cmd_async(cmd); dict_transaction_commit_async(&trans->ctx, cmd_commit_callback, cmd); return 1; } static int cmd_commit_async(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; if (dict_connection_transaction_lookup_parse(cmd->conn, line, &trans) < 0) return -1; cmd->trans_id = trans->id; dict_connection_cmd_async(cmd); dict_transaction_commit_async(&trans->ctx, cmd_commit_callback_async, cmd); return 1; } static int cmd_rollback(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; if (dict_connection_transaction_lookup_parse(cmd->conn, line, &trans) < 0) return -1; dict_transaction_rollback(&trans->ctx); dict_connection_transaction_array_remove(cmd->conn, trans->id); return 0; } static int cmd_set(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; /* <id> <key> <value> */ args = t_strsplit_tabescaped(line); if (str_array_length(args) != 3) { i_error("dict client: SET: broken input"); return -1; } if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; dict_set(trans->ctx, args[1], args[2]); return 0; } static int cmd_unset(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; /* <id> <key> */ args = t_strsplit_tabescaped(line); if (str_array_length(args) != 2) { i_error("dict client: UNSET: broken input"); return -1; } if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; dict_unset(trans->ctx, args[1]); return 0; } static int cmd_append(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; /* <id> <key> <value> */ args = t_strsplit_tabescaped(line); if (str_array_length(args) != 3) { i_error("dict client: APPEND: broken input"); return -1; } if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; dict_append(trans->ctx, args[1], args[2]); return 0; } static int cmd_atomic_inc(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; long long diff; /* <id> <key> <diff> */ args = t_strsplit_tabescaped(line); if (str_array_length(args) != 3 || str_to_llong(args[2], &diff) < 0) { i_error("dict client: ATOMIC_INC: broken input"); return -1; } if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; dict_atomic_inc(trans->ctx, args[1], diff); return 0; } static int cmd_timestamp(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; long long tv_sec; unsigned int tv_nsec; /* <id> <secs> <nsecs> */ args = t_strsplit_tabescaped(line); if (str_array_length(args) != 3 || str_to_llong(args[1], &tv_sec) < 0 || str_to_uint(args[2], &tv_nsec) < 0) { i_error("dict client: TIMESTAMP: broken input"); return -1; } if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; struct timespec ts = { .tv_sec = tv_sec, .tv_nsec = tv_nsec }; dict_transaction_set_timestamp(trans->ctx, &ts); return 0; } static const struct dict_cmd_func cmds[] = { { DICT_PROTOCOL_CMD_LOOKUP, cmd_lookup }, { DICT_PROTOCOL_CMD_ITERATE, cmd_iterate }, { DICT_PROTOCOL_CMD_BEGIN, cmd_begin }, { DICT_PROTOCOL_CMD_COMMIT, cmd_commit }, { DICT_PROTOCOL_CMD_COMMIT_ASYNC, cmd_commit_async }, { DICT_PROTOCOL_CMD_ROLLBACK, cmd_rollback }, { DICT_PROTOCOL_CMD_SET, cmd_set }, { DICT_PROTOCOL_CMD_UNSET, cmd_unset }, { DICT_PROTOCOL_CMD_APPEND, cmd_append }, { DICT_PROTOCOL_CMD_ATOMIC_INC, cmd_atomic_inc }, { DICT_PROTOCOL_CMD_TIMESTAMP, cmd_timestamp }, { 0, NULL } }; static const struct dict_cmd_func *dict_command_find(enum dict_protocol_cmd cmd) { unsigned int i; for (i = 0; cmds[i].cmd != '\0'; i++) { if (cmds[i].cmd == cmd) return &cmds[i]; } return NULL; } int dict_command_input(struct dict_connection *conn, const char *line) { const struct dict_cmd_func *cmd_func; struct dict_connection_cmd *cmd; int ret; cmd_func = dict_command_find((enum dict_protocol_cmd)*line); if (cmd_func == NULL) { i_error("dict client: Unknown command %c", *line); return -1; } cmd = i_new(struct dict_connection_cmd, 1); cmd->conn = conn; cmd->cmd = cmd_func; cmd->start_timeval = ioloop_timeval; array_append(&conn->cmds, &cmd, 1); dict_connection_ref(conn); if ((ret = cmd_func->func(cmd, line + 1)) <= 0) { dict_connection_cmd_remove(cmd); return ret; } return 0; } static bool dict_connection_cmds_try_output_more(struct dict_connection *conn) { struct dict_connection_cmd *const *cmdp, *cmd; /* only iterators may be returning a lot of data */ array_foreach(&conn->cmds, cmdp) { cmd = *cmdp; if (cmd->iter == NULL) { /* not an iterator */ } else if (cmd_iterate_flush(cmd) == 0) { /* unfinished */ } else { dict_connection_cmd_try_flush(&cmd); /* cmd should be freed now, restart output */ return TRUE; } if (conn->minor_version < DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION) break; /* try to flush the rest */ } return FALSE; } void dict_connection_cmds_output_more(struct dict_connection *conn) { while (array_count(&conn->cmds) > 0) { if (!dict_connection_cmds_try_output_more(conn)) break; } } static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd) { struct dict_connection_cmd *const *first_cmdp; if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION) { first_cmdp = array_idx(&cmd->conn->cmds, 0); if (*first_cmdp != cmd) return; } (void)dict_connection_cmds_try_output_more(cmd->conn); } void dict_commands_init(void) { cmd_stats.lookups = timing_init(); cmd_stats.iterations = timing_init(); cmd_stats.commits = timing_init(); } void dict_commands_deinit(void) { timing_deinit(&cmd_stats.lookups); timing_deinit(&cmd_stats.iterations); timing_deinit(&cmd_stats.commits); }