Mercurial > dovecot > core-2.2
changeset 19642:234364260d8d
pop3c: Added full support for running commands asynchronously (with PIPELINING)
author | Timo Sirainen <timo.sirainen@dovecot.fi> |
---|---|
date | Tue, 26 Jan 2016 15:40:09 +0200 |
parents | b7e2d981519c |
children | e7fe7db04f3a |
files | src/lib-storage/index/pop3c/pop3c-client.c src/lib-storage/index/pop3c/pop3c-client.h src/lib-storage/index/pop3c/pop3c-mail.c src/lib-storage/index/pop3c/pop3c-storage.c src/lib-storage/index/pop3c/pop3c-sync.c |
diffstat | 5 files changed, 237 insertions(+), 146 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lib-storage/index/pop3c/pop3c-client.c Tue Jan 26 15:38:13 2016 +0200 +++ b/src/lib-storage/index/pop3c/pop3c-client.c Tue Jan 26 15:40:09 2016 +0200 @@ -1,9 +1,11 @@ /* Copyright (c) 2011-2016 Dovecot authors, see the included COPYING file */ #include "lib.h" +#include "array.h" #include "ioloop.h" #include "net.h" #include "istream.h" +#include "istream-chain.h" #include "istream-dot.h" #include "istream-seekable.h" #include "ostream.h" @@ -38,6 +40,20 @@ POP3C_CLIENT_STATE_DONE }; +struct pop3c_client_sync_cmd_ctx { + enum pop3c_command_state state; + char *reply; +}; + +struct pop3c_client_cmd { + struct istream *input; + struct istream_chain *chain; + bool reading_dot; + + pop3c_cmd_callback_t *callback; + void *context; +}; + struct pop3c_client { pool_t pool; struct pop3c_client_settings set; @@ -59,7 +75,7 @@ pop3c_login_callback_t *login_callback; void *login_context; - unsigned int async_commands; + ARRAY(struct pop3c_client_cmd) commands; const char *input_line; struct istream *dot_input; @@ -71,6 +87,7 @@ struct pop3c_client *client); static void pop3c_client_connect_ip(struct pop3c_client *client); static int pop3c_client_ssl_init(struct pop3c_client *client); +static void pop3c_client_input(struct pop3c_client *client); struct pop3c_client * pop3c_client_init(const struct pop3c_client_settings *set) @@ -84,6 +101,7 @@ client = p_new(pool, struct pop3c_client, 1); client->pool = pool; client->fd = -1; + p_array_init(&client->commands, pool, 16); client->set.debug = set->debug; client->set.host = p_strdup(pool, set->host); @@ -131,10 +149,50 @@ } } +static void +pop3c_client_async_callback(struct pop3c_client *client, + enum pop3c_command_state state, const char *reply) +{ + struct pop3c_client_cmd *cmd, cmd_copy; + bool running = client->running; + + i_assert(reply != NULL); + i_assert(array_count(&client->commands) > 0); + + cmd = array_idx_modifiable(&client->commands, 0); + if (cmd->input != NULL && state == POP3C_COMMAND_STATE_OK && + !cmd->reading_dot) { + /* read the full input into seekable-istream before calling + the callback */ + i_assert(client->dot_input == NULL); + i_stream_chain_append(cmd->chain, client->input); + client->dot_input = cmd->input; + cmd->reading_dot = TRUE; + return; + } + cmd_copy = *cmd; + array_delete(&client->commands, 0, 1); + + if (cmd_copy.input != NULL) { + i_stream_seek(cmd_copy.input, 0); + i_stream_unref(&cmd_copy.input); + } + if (cmd_copy.callback != NULL) + cmd_copy.callback(state, reply, cmd_copy.context); + if (running) + io_loop_stop(current_ioloop); +} + +static void +pop3c_client_async_callback_disconnected(struct pop3c_client *client) +{ + pop3c_client_async_callback(client, POP3C_COMMAND_STATE_DISCONNECTED, + "Disconnected"); +} + static void pop3c_client_disconnect(struct pop3c_client *client) { client->state = POP3C_CLIENT_STATE_DISCONNECTED; - client->async_commands = 0; if (client->running) io_loop_stop(current_ioloop); @@ -156,6 +214,8 @@ i_error("close(pop3c) failed: %m"); client->fd = -1; } + while (array_count(&client->commands) > 0) + pop3c_client_async_callback_disconnected(client); client_login_callback(client, POP3C_COMMAND_STATE_DISCONNECTED, "Disconnected"); } @@ -233,13 +293,21 @@ return 0; } -void pop3c_client_run(struct pop3c_client *client) +void pop3c_client_wait_one(struct pop3c_client *client) { struct ioloop *ioloop, *prev_ioloop = current_ioloop; bool timeout_added = FALSE, failed = FALSE; + if (client->state == POP3C_CLIENT_STATE_DISCONNECTED && + array_count(&client->commands) > 0) { + while (array_count(&client->commands) > 0) + pop3c_client_async_callback_disconnected(client); + } + i_assert(client->fd != -1 || client->state == POP3C_CLIENT_STATE_CONNECTING); + i_assert(array_count(&client->commands) > 0 || + client->state == POP3C_CLIENT_STATE_CONNECTING); ioloop = io_loop_create(); pop3c_client_ioloop_changed(client); @@ -328,6 +396,8 @@ static void pop3c_client_login_finished(struct pop3c_client *client) { io_remove(&client->io); + client->io = io_add(client->fd, IO_READ, pop3c_client_input, client); + timeout_remove(&client->to); client->state = POP3C_CLIENT_STATE_DONE; @@ -635,110 +705,139 @@ return client->capabilities; } -static void pop3c_client_input_reply(struct pop3c_client *client) +static int pop3c_client_dot_input(struct pop3c_client *client) { - i_assert(client->state == POP3C_CLIENT_STATE_DONE); + ssize_t ret; + + while ((ret = i_stream_read(client->dot_input)) > 0 || ret == -2) { + i_stream_skip(client->dot_input, + i_stream_get_data_size(client->dot_input)); + } + if (ret == 0) + return 0; + i_assert(ret == -1); - if (client->to != NULL) - timeout_reset(client->to); - client->input_line = i_stream_read_next_line(client->input); - if (client->input_line != NULL) - io_loop_stop(current_ioloop); - else if (client->input->closed || client->input->eof || - client->input->stream_errno != 0) { - /* disconnected */ - i_error("pop3c(%s): Server disconnected unexpectedly", - client->set.host); - pop3c_client_disconnect(client); - io_loop_stop(current_ioloop); + if (client->dot_input->stream_errno == 0) + ret = 1; + client->dot_input = NULL; + + if (ret > 0) { + /* currently we don't actually care about preserving the + +OK reply line for multi-line replies, so just return + it as empty */ + pop3c_client_async_callback(client, POP3C_COMMAND_STATE_OK, ""); + return 1; + } else { + pop3c_client_async_callback_disconnected(client); + return -1; } } static int -pop3c_client_read_line(struct pop3c_client *client, - const char **line_r, const char **error_r) -{ - i_assert(client->io == NULL); - i_assert(client->input_line == NULL); - - client->io = io_add(client->fd, IO_READ, - pop3c_client_input_reply, client); - pop3c_client_input_reply(client); - if (client->input_line == NULL && client->input != NULL) - pop3c_client_run(client); - - if (client->input_line == NULL) { - i_assert(client->io == NULL); - *error_r = "Disconnected"; - return -1; - } - - io_remove(&client->io); - *line_r = t_strdup(client->input_line); - client->input_line = NULL; - return 0; -} - -static int -pop3c_client_flush_asyncs(struct pop3c_client *client, const char **error_r) +pop3c_client_input_next_reply(struct pop3c_client *client) { const char *line; + enum pop3c_client_state state; - if (client->state != POP3C_CLIENT_STATE_DONE) { - i_assert(client->state == POP3C_CLIENT_STATE_DISCONNECTED); - *error_r = "Disconnected"; - return -1; - } + line = i_stream_read_next_line(client->input); + if (line == NULL) + return client->input->eof ? -1 : 0; - while (client->async_commands > 0) { - if (pop3c_client_read_line(client, &line, error_r) < 0) - return -1; - client->async_commands--; + if (strncasecmp(line, "+OK", 3) == 0) { + line += 3; + state = POP3C_COMMAND_STATE_OK; + } else if (strncasecmp(line, "-ERR", 4) == 0) { + line += 4; + state = POP3C_COMMAND_STATE_ERR; + } else { + i_error("pop3c(%s): Server sent unrecognized line: %s", + client->set.host, line); + state = POP3C_COMMAND_STATE_ERR; } - return 0; + if (line[0] == ' ') + line++; + if (array_count(&client->commands) == 0) { + i_error("pop3c(%s): Server sent line when no command was running: %s", + client->set.host, line); + } else { + pop3c_client_async_callback(client, state, line); + } + return 1; } -int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmd, - const char **reply_r) +static void pop3c_client_input(struct pop3c_client *client) { - const char *line; int ret; - if (pop3c_client_flush_asyncs(client, reply_r) < 0) - return -1; - o_stream_nsend_str(client->output, cmd); - if (pop3c_client_read_line(client, &line, reply_r) < 0) - return -1; - if (strncasecmp(line, "+OK", 3) == 0) { - *reply_r = line + 3; - ret = 0; - } else if (strncasecmp(line, "-ERR", 4) == 0) { - *reply_r = line + 4; - ret = -1; - } else { - *reply_r = line; - ret = -1; + if (client->to != NULL) + timeout_reset(client->to); + do { + if (client->dot_input != NULL) { + /* continue reading the current multiline reply */ + if ((ret = pop3c_client_dot_input(client)) == 0) + return; + } else { + ret = pop3c_client_input_next_reply(client); + } + } while (ret > 0); + + if (ret < 0) { + i_error("pop3c(%s): Server disconnected unexpectedly", + client->set.host); + pop3c_client_disconnect(client); } - if (**reply_r == ' ') - *reply_r += 1; - return ret; +} + +static void pop3c_client_cmd_reply(enum pop3c_command_state state, + const char *reply, void *context) +{ + struct pop3c_client_sync_cmd_ctx *ctx = context; + + i_assert(ctx->reply == NULL); + + ctx->state = state; + ctx->reply = i_strdup(reply); } -void pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmd) +int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmdline, + const char **reply_r) { - const char *error; + struct pop3c_client_sync_cmd_ctx ctx; - if (client->state != POP3C_CLIENT_STATE_DONE) { - i_assert(client->state == POP3C_CLIENT_STATE_DISCONNECTED); - return; - } + memset(&ctx, 0, sizeof(ctx)); + pop3c_client_cmd_line_async(client, cmdline, pop3c_client_cmd_reply, &ctx); + while (ctx.reply == NULL) + pop3c_client_wait_one(client); + *reply_r = t_strdup(ctx.reply); + i_free(ctx.reply); + return ctx.state == POP3C_COMMAND_STATE_OK ? 0 : -1; +} + +struct pop3c_client_cmd * +pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmdline, + pop3c_cmd_callback_t *callback, void *context) +{ + struct pop3c_client_cmd *cmd; if ((client->capabilities & POP3C_CAPABILITY_PIPELINING) == 0) { - if (pop3c_client_flush_asyncs(client, &error) < 0) - return; + while (array_count(&client->commands) > 0) + pop3c_client_wait_one(client); } - o_stream_nsend_str(client->output, cmd); - client->async_commands++; + i_assert(client->state == POP3C_CLIENT_STATE_DISCONNECTED || + client->state == POP3C_CLIENT_STATE_DONE); + if (client->state == POP3C_CLIENT_STATE_DONE) + o_stream_nsend_str(client->output, cmdline); + + cmd = array_append_space(&client->commands); + cmd->callback = callback; + cmd->context = context; + return cmd; +} + +void pop3c_client_cmd_line_async_nocb(struct pop3c_client *client, + const char *cmdline) +{ + pop3c_client_cmd_line_async(client, cmdline, NULL, NULL); } static int seekable_fd_callback(const char **path_r, void *context) @@ -766,67 +865,44 @@ return fd; } -static void pop3c_client_dot_input(struct pop3c_client *client) -{ - ssize_t ret; - - if (client->to != NULL) - timeout_reset(client->to); - while ((ret = i_stream_read(client->dot_input)) > 0 || ret == -2) { - i_stream_skip(client->dot_input, - i_stream_get_data_size(client->dot_input)); - } - if (ret != 0) { - i_assert(ret == -1); - if (client->dot_input->stream_errno != 0) { - i_error("pop3c(%s): Server disconnected unexpectedly", - client->set.host); - pop3c_client_disconnect(client); - } - if (client->running) - io_loop_stop(current_ioloop); - } -} - -int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmd, +int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmdline, struct istream **input_r, const char **error_r) { - struct istream *inputs[2]; + struct pop3c_client_sync_cmd_ctx ctx; + const char *reply; - *input_r = NULL; + memset(&ctx, 0, sizeof(ctx)); + *input_r = pop3c_client_cmd_stream_async(client, cmdline, + pop3c_client_cmd_reply, &ctx); + while (ctx.reply == NULL) + pop3c_client_wait_one(client); + reply = t_strdup(ctx.reply); + i_free(ctx.reply); - /* read the +OK / -ERR */ - if (pop3c_client_cmd_line(client, cmd, error_r) < 0) - return -1; - /* read the stream */ - inputs[0] = i_stream_create_dot(client->input, TRUE); + if (ctx.state == POP3C_COMMAND_STATE_OK) + return 0; + i_stream_unref(input_r); + *error_r = reply; + return -1; +} + +struct istream * +pop3c_client_cmd_stream_async(struct pop3c_client *client, const char *cmdline, + pop3c_cmd_callback_t *callback, void *context) +{ + struct istream *input, *inputs[2]; + struct pop3c_client_cmd *cmd; + + cmd = pop3c_client_cmd_line_async(client, cmdline, callback, context); + + input = i_stream_create_chain(&cmd->chain); + inputs[0] = i_stream_create_dot(input, TRUE); inputs[1] = NULL; - client->dot_input = - i_stream_create_seekable(inputs, POP3C_MAX_INBUF_SIZE, - seekable_fd_callback, client); + cmd->input = i_stream_create_seekable(inputs, POP3C_MAX_INBUF_SIZE, + seekable_fd_callback, client); + i_stream_unref(&input); i_stream_unref(&inputs[0]); - i_assert(client->io == NULL); - client->io = io_add(client->fd, IO_READ, - pop3c_client_dot_input, client); - /* read any pending data from the stream */ - pop3c_client_dot_input(client); - if (!client->dot_input->eof) - pop3c_client_run(client); - - if (client->input == NULL) { - i_assert(client->io == NULL); - i_stream_destroy(&client->dot_input); - *error_r = "Disconnected"; - return -1; - } - io_remove(&client->io); - i_stream_seek(client->dot_input, 0); - /* if this stream is used by some filter stream, make the filter - stream blocking */ - client->dot_input->blocking = TRUE; - - *input_r = client->dot_input; - client->dot_input = NULL; - return 0; + i_stream_ref(cmd->input); + return cmd->input; }
--- a/src/lib-storage/index/pop3c/pop3c-client.h Tue Jan 26 15:38:13 2016 +0200 +++ b/src/lib-storage/index/pop3c/pop3c-client.h Tue Jan 26 15:40:09 2016 +0200 @@ -43,13 +43,13 @@ typedef void pop3c_login_callback_t(enum pop3c_command_state state, const char *reply, void *context); +typedef void pop3c_cmd_callback_t(enum pop3c_command_state state, + const char *reply, void *context); struct pop3c_client * pop3c_client_init(const struct pop3c_client_settings *set); void pop3c_client_deinit(struct pop3c_client **client); -void pop3c_client_run(struct pop3c_client *client); - void pop3c_client_login(struct pop3c_client *client, pop3c_login_callback_t *callback, void *context); @@ -59,13 +59,25 @@ /* Returns 0 if received +OK reply, reply contains the text without the +OK. Returns -1 if received -ERR reply or disconnected. */ -int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmd, +int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmdline, const char **reply_r); +/* Start the command asynchronously. Call the callback when finished. */ +struct pop3c_client_cmd * +pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmdline, + pop3c_cmd_callback_t *callback, void *context); /* Send a command, don't care if it succeeds or not. */ -void pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmd); +void pop3c_client_cmd_line_async_nocb(struct pop3c_client *client, + const char *cmdline); /* Returns 0 and stream if succeeded, -1 and error if received -ERR reply or disconnected. */ -int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmd, +int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmdline, struct istream **input_r, const char **error_r); +/* Start the command asynchronously. Call the callback when finished. */ +struct istream * +pop3c_client_cmd_stream_async(struct pop3c_client *client, const char *cmdline, + pop3c_cmd_callback_t *callback, void *context); +/* Wait for the next async command to finish. It's an error to call this when + there are no pending async commands. */ +void pop3c_client_wait_one(struct pop3c_client *client); #endif
--- a/src/lib-storage/index/pop3c/pop3c-mail.c Tue Jan 26 15:38:13 2016 +0200 +++ b/src/lib-storage/index/pop3c/pop3c-mail.c Tue Jan 26 15:40:09 2016 +0200 @@ -151,6 +151,9 @@ if (get_body) pop3c_mail_cache_size(mail); } + /* if this stream is used by some filter stream, make the + filter stream blocking */ + mail->data.stream->blocking = TRUE; return index_mail_init_stream(mail, hdr_size, body_size, stream_r); }
--- a/src/lib-storage/index/pop3c/pop3c-storage.c Tue Jan 26 15:38:13 2016 +0200 +++ b/src/lib-storage/index/pop3c/pop3c-storage.c Tue Jan 26 15:40:09 2016 +0200 @@ -176,7 +176,7 @@ mbox->client = pop3c_client_create_from_set(box->storage, mbox->storage->set); pop3c_client_login(mbox->client, pop3c_login_callback, mbox); - pop3c_client_run(mbox->client); + pop3c_client_wait_one(mbox->client); return mbox->logged_in ? 0 : -1; }
--- a/src/lib-storage/index/pop3c/pop3c-sync.c Tue Jan 26 15:38:13 2016 +0200 +++ b/src/lib-storage/index/pop3c/pop3c-sync.c Tue Jan 26 15:40:09 2016 +0200 @@ -319,7 +319,7 @@ str_truncate(str, 0); str_printfa(str, "DELE %u\r\n", idx+1); - pop3c_client_cmd_line_async(mbox->client, str_c(str)); + pop3c_client_cmd_line_async_nocb(mbox->client, str_c(str)); deletions = TRUE; } }