changeset 19642:234364260d8d

pop3c: Added full support for running commands asynchronously (with PIPELINING)
author Timo Sirainen <timo.sirainen@dovecot.fi>
date Tue, 26 Jan 2016 15:40:09 +0200
parents b7e2d981519c
children e7fe7db04f3a
files src/lib-storage/index/pop3c/pop3c-client.c src/lib-storage/index/pop3c/pop3c-client.h src/lib-storage/index/pop3c/pop3c-mail.c src/lib-storage/index/pop3c/pop3c-storage.c src/lib-storage/index/pop3c/pop3c-sync.c
diffstat 5 files changed, 237 insertions(+), 146 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib-storage/index/pop3c/pop3c-client.c	Tue Jan 26 15:38:13 2016 +0200
+++ b/src/lib-storage/index/pop3c/pop3c-client.c	Tue Jan 26 15:40:09 2016 +0200
@@ -1,9 +1,11 @@
 /* Copyright (c) 2011-2016 Dovecot authors, see the included COPYING file */
 
 #include "lib.h"
+#include "array.h"
 #include "ioloop.h"
 #include "net.h"
 #include "istream.h"
+#include "istream-chain.h"
 #include "istream-dot.h"
 #include "istream-seekable.h"
 #include "ostream.h"
@@ -38,6 +40,20 @@
 	POP3C_CLIENT_STATE_DONE
 };
 
+struct pop3c_client_sync_cmd_ctx {
+	enum pop3c_command_state state;
+	char *reply;
+};
+
+struct pop3c_client_cmd {
+	struct istream *input;
+	struct istream_chain *chain;
+	bool reading_dot;
+
+	pop3c_cmd_callback_t *callback;
+	void *context;
+};
+
 struct pop3c_client {
 	pool_t pool;
 	struct pop3c_client_settings set;
@@ -59,7 +75,7 @@
 	pop3c_login_callback_t *login_callback;
 	void *login_context;
 
-	unsigned int async_commands;
+	ARRAY(struct pop3c_client_cmd) commands;
 	const char *input_line;
 	struct istream *dot_input;
 
@@ -71,6 +87,7 @@
 		   struct pop3c_client *client);
 static void pop3c_client_connect_ip(struct pop3c_client *client);
 static int pop3c_client_ssl_init(struct pop3c_client *client);
+static void pop3c_client_input(struct pop3c_client *client);
 
 struct pop3c_client *
 pop3c_client_init(const struct pop3c_client_settings *set)
@@ -84,6 +101,7 @@
 	client = p_new(pool, struct pop3c_client, 1);
 	client->pool = pool;
 	client->fd = -1;
+	p_array_init(&client->commands, pool, 16);
 
 	client->set.debug = set->debug;
 	client->set.host = p_strdup(pool, set->host);
@@ -131,10 +149,50 @@
 	}
 }
 
+static void
+pop3c_client_async_callback(struct pop3c_client *client,
+			    enum pop3c_command_state state, const char *reply)
+{
+	struct pop3c_client_cmd *cmd, cmd_copy;
+	bool running = client->running;
+
+	i_assert(reply != NULL);
+	i_assert(array_count(&client->commands) > 0);
+
+	cmd = array_idx_modifiable(&client->commands, 0);
+	if (cmd->input != NULL && state == POP3C_COMMAND_STATE_OK &&
+	    !cmd->reading_dot) {
+		/* read the full input into seekable-istream before calling
+		   the callback */
+		i_assert(client->dot_input == NULL);
+		i_stream_chain_append(cmd->chain, client->input);
+		client->dot_input = cmd->input;
+		cmd->reading_dot = TRUE;
+		return;
+	}
+	cmd_copy = *cmd;
+	array_delete(&client->commands, 0, 1);
+
+	if (cmd_copy.input != NULL) {
+		i_stream_seek(cmd_copy.input, 0);
+		i_stream_unref(&cmd_copy.input);
+	}
+	if (cmd_copy.callback != NULL)
+		cmd_copy.callback(state, reply, cmd_copy.context);
+	if (running)
+		io_loop_stop(current_ioloop);
+}
+
+static void
+pop3c_client_async_callback_disconnected(struct pop3c_client *client)
+{
+	pop3c_client_async_callback(client, POP3C_COMMAND_STATE_DISCONNECTED,
+				    "Disconnected");
+}
+
 static void pop3c_client_disconnect(struct pop3c_client *client)
 {
 	client->state = POP3C_CLIENT_STATE_DISCONNECTED;
-	client->async_commands = 0;
 
 	if (client->running)
 		io_loop_stop(current_ioloop);
@@ -156,6 +214,8 @@
 			i_error("close(pop3c) failed: %m");
 		client->fd = -1;
 	}
+	while (array_count(&client->commands) > 0)
+		pop3c_client_async_callback_disconnected(client);
 	client_login_callback(client, POP3C_COMMAND_STATE_DISCONNECTED,
 			      "Disconnected");
 }
@@ -233,13 +293,21 @@
 	return 0;
 }
 
-void pop3c_client_run(struct pop3c_client *client)
+void pop3c_client_wait_one(struct pop3c_client *client)
 {
 	struct ioloop *ioloop, *prev_ioloop = current_ioloop;
 	bool timeout_added = FALSE, failed = FALSE;
 
+	if (client->state == POP3C_CLIENT_STATE_DISCONNECTED &&
+	    array_count(&client->commands) > 0) {
+		while (array_count(&client->commands) > 0)
+			pop3c_client_async_callback_disconnected(client);
+	}
+
 	i_assert(client->fd != -1 ||
 		 client->state == POP3C_CLIENT_STATE_CONNECTING);
+	i_assert(array_count(&client->commands) > 0 ||
+		 client->state == POP3C_CLIENT_STATE_CONNECTING);
 
 	ioloop = io_loop_create();
 	pop3c_client_ioloop_changed(client);
@@ -328,6 +396,8 @@
 static void pop3c_client_login_finished(struct pop3c_client *client)
 {
 	io_remove(&client->io);
+	client->io = io_add(client->fd, IO_READ, pop3c_client_input, client);
+
 	timeout_remove(&client->to);
 	client->state = POP3C_CLIENT_STATE_DONE;
 
@@ -635,110 +705,139 @@
 	return client->capabilities;
 }
 
-static void pop3c_client_input_reply(struct pop3c_client *client)
+static int pop3c_client_dot_input(struct pop3c_client *client)
 {
-	i_assert(client->state == POP3C_CLIENT_STATE_DONE);
+	ssize_t ret;
+
+	while ((ret = i_stream_read(client->dot_input)) > 0 || ret == -2) {
+		i_stream_skip(client->dot_input,
+			      i_stream_get_data_size(client->dot_input));
+	}
+	if (ret == 0)
+		return 0;
+	i_assert(ret == -1);
 
-	if (client->to != NULL)
-		timeout_reset(client->to);
-	client->input_line = i_stream_read_next_line(client->input);
-	if (client->input_line != NULL)
-		io_loop_stop(current_ioloop);
-	else if (client->input->closed || client->input->eof ||
-		 client->input->stream_errno != 0) {
-		/* disconnected */
-		i_error("pop3c(%s): Server disconnected unexpectedly",
-			client->set.host);
-		pop3c_client_disconnect(client);
-		io_loop_stop(current_ioloop);
+	if (client->dot_input->stream_errno == 0)
+		ret = 1;
+	client->dot_input = NULL;
+
+	if (ret > 0) {
+		/* currently we don't actually care about preserving the
+		   +OK reply line for multi-line replies, so just return
+		   it as empty */
+		pop3c_client_async_callback(client, POP3C_COMMAND_STATE_OK, "");
+		return 1;
+	} else {
+		pop3c_client_async_callback_disconnected(client);
+		return -1;
 	}
 }
 
 static int
-pop3c_client_read_line(struct pop3c_client *client,
-		       const char **line_r, const char **error_r)
-{
-	i_assert(client->io == NULL);
-	i_assert(client->input_line == NULL);
-
-	client->io = io_add(client->fd, IO_READ,
-			    pop3c_client_input_reply, client);
-	pop3c_client_input_reply(client);
-	if (client->input_line == NULL && client->input != NULL)
-		pop3c_client_run(client);
-
-	if (client->input_line == NULL) {
-		i_assert(client->io == NULL);
-		*error_r = "Disconnected";
-		return -1;
-	}
-
-	io_remove(&client->io);
-	*line_r = t_strdup(client->input_line);
-	client->input_line = NULL;
-	return 0;
-}
-
-static int
-pop3c_client_flush_asyncs(struct pop3c_client *client, const char **error_r)
+pop3c_client_input_next_reply(struct pop3c_client *client)
 {
 	const char *line;
+	enum pop3c_client_state state;
 
-	if (client->state != POP3C_CLIENT_STATE_DONE) {
-		i_assert(client->state == POP3C_CLIENT_STATE_DISCONNECTED);
-		*error_r = "Disconnected";
-		return -1;
-	}
+	line = i_stream_read_next_line(client->input);
+	if (line == NULL)
+		return client->input->eof ? -1 : 0;
 
-	while (client->async_commands > 0) {
-		if (pop3c_client_read_line(client, &line, error_r) < 0)
-			return -1;
-		client->async_commands--;
+	if (strncasecmp(line, "+OK", 3) == 0) {
+		line += 3;
+		state = POP3C_COMMAND_STATE_OK;
+	} else if (strncasecmp(line, "-ERR", 4) == 0) {
+		line += 4;
+		state = POP3C_COMMAND_STATE_ERR;
+	} else {
+		i_error("pop3c(%s): Server sent unrecognized line: %s",
+			client->set.host, line);
+		state = POP3C_COMMAND_STATE_ERR;
 	}
-	return 0;
+	if (line[0] == ' ')
+		line++;
+	if (array_count(&client->commands) == 0) {
+		i_error("pop3c(%s): Server sent line when no command was running: %s",
+			client->set.host, line);
+	} else {
+		pop3c_client_async_callback(client, state, line);
+	}
+	return 1;
 }
 
-int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmd,
-			  const char **reply_r)
+static void pop3c_client_input(struct pop3c_client *client)
 {
-	const char *line;
 	int ret;
 
-	if (pop3c_client_flush_asyncs(client, reply_r) < 0)
-		return -1;
-	o_stream_nsend_str(client->output, cmd);
-	if (pop3c_client_read_line(client, &line, reply_r) < 0)
-		return -1;
-	if (strncasecmp(line, "+OK", 3) == 0) {
-		*reply_r = line + 3;
-		ret = 0;
-	} else if (strncasecmp(line, "-ERR", 4) == 0) {
-		*reply_r = line + 4;
-		ret = -1;
-	} else {
-		*reply_r = line;
-		ret = -1;
+	if (client->to != NULL)
+		timeout_reset(client->to);
+	do {
+		if (client->dot_input != NULL) {
+			/* continue reading the current multiline reply */
+			if ((ret = pop3c_client_dot_input(client)) == 0)
+				return;
+		} else {
+			ret = pop3c_client_input_next_reply(client);
+		}
+	} while (ret > 0);
+
+	if (ret < 0) {
+		i_error("pop3c(%s): Server disconnected unexpectedly",
+			client->set.host);
+		pop3c_client_disconnect(client);
 	}
-	if (**reply_r == ' ')
-		*reply_r += 1;
-	return ret;
+}
+
+static void pop3c_client_cmd_reply(enum pop3c_command_state state,
+				   const char *reply, void *context)
+{
+	struct pop3c_client_sync_cmd_ctx *ctx = context;
+
+	i_assert(ctx->reply == NULL);
+
+	ctx->state = state;
+	ctx->reply = i_strdup(reply);
 }
 
-void pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmd)
+int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmdline,
+			  const char **reply_r)
 {
-	const char *error;
+	struct pop3c_client_sync_cmd_ctx ctx;
 
-	if (client->state != POP3C_CLIENT_STATE_DONE) {
-		i_assert(client->state == POP3C_CLIENT_STATE_DISCONNECTED);
-		return;
-	}
+	memset(&ctx, 0, sizeof(ctx));
+	pop3c_client_cmd_line_async(client, cmdline, pop3c_client_cmd_reply, &ctx);
+	while (ctx.reply == NULL)
+		pop3c_client_wait_one(client);
+	*reply_r = t_strdup(ctx.reply);
+	i_free(ctx.reply);
+	return ctx.state == POP3C_COMMAND_STATE_OK ? 0 : -1;
+}
+
+struct pop3c_client_cmd *
+pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmdline,
+			    pop3c_cmd_callback_t *callback, void *context)
+{
+	struct pop3c_client_cmd *cmd;
 
 	if ((client->capabilities & POP3C_CAPABILITY_PIPELINING) == 0) {
-		if (pop3c_client_flush_asyncs(client, &error) < 0)
-			return;
+		while (array_count(&client->commands) > 0)
+			pop3c_client_wait_one(client);
 	}
-	o_stream_nsend_str(client->output, cmd);
-	client->async_commands++;
+	i_assert(client->state == POP3C_CLIENT_STATE_DISCONNECTED ||
+		 client->state == POP3C_CLIENT_STATE_DONE);
+	if (client->state == POP3C_CLIENT_STATE_DONE)
+		o_stream_nsend_str(client->output, cmdline);
+
+	cmd = array_append_space(&client->commands);
+	cmd->callback = callback;
+	cmd->context = context;
+	return cmd;
+}
+
+void pop3c_client_cmd_line_async_nocb(struct pop3c_client *client,
+				      const char *cmdline)
+{
+	pop3c_client_cmd_line_async(client, cmdline, NULL, NULL);
 }
 
 static int seekable_fd_callback(const char **path_r, void *context)
@@ -766,67 +865,44 @@
 	return fd;
 }
 
-static void pop3c_client_dot_input(struct pop3c_client *client)
-{
-	ssize_t ret;
-
-	if (client->to != NULL)
-		timeout_reset(client->to);
-	while ((ret = i_stream_read(client->dot_input)) > 0 || ret == -2) {
-		i_stream_skip(client->dot_input,
-			      i_stream_get_data_size(client->dot_input));
-	}
-	if (ret != 0) {
-		i_assert(ret == -1);
-		if (client->dot_input->stream_errno != 0) {
-			i_error("pop3c(%s): Server disconnected unexpectedly",
-				client->set.host);
-			pop3c_client_disconnect(client);
-		}
-		if (client->running)
-			io_loop_stop(current_ioloop);
-	}
-}
-
-int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmd,
+int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmdline,
 			    struct istream **input_r, const char **error_r)
 {
-	struct istream *inputs[2];
+	struct pop3c_client_sync_cmd_ctx ctx;
+	const char *reply;
 
-	*input_r = NULL;
+	memset(&ctx, 0, sizeof(ctx));
+	*input_r = pop3c_client_cmd_stream_async(client, cmdline,
+						 pop3c_client_cmd_reply, &ctx);
+	while (ctx.reply == NULL)
+		pop3c_client_wait_one(client);
+	reply = t_strdup(ctx.reply);
+	i_free(ctx.reply);
 
-	/* read the +OK / -ERR */
-	if (pop3c_client_cmd_line(client, cmd, error_r) < 0)
-		return -1;
-	/* read the stream */
-	inputs[0] = i_stream_create_dot(client->input, TRUE);
+	if (ctx.state == POP3C_COMMAND_STATE_OK)
+		return 0;
+	i_stream_unref(input_r);
+	*error_r = reply;
+	return -1;
+}
+
+struct istream *
+pop3c_client_cmd_stream_async(struct pop3c_client *client, const char *cmdline,
+			      pop3c_cmd_callback_t *callback, void *context)
+{
+	struct istream *input, *inputs[2];
+	struct pop3c_client_cmd *cmd;
+
+	cmd = pop3c_client_cmd_line_async(client, cmdline, callback, context);
+
+	input = i_stream_create_chain(&cmd->chain);
+	inputs[0] = i_stream_create_dot(input, TRUE);
 	inputs[1] = NULL;
-	client->dot_input =
-		i_stream_create_seekable(inputs, POP3C_MAX_INBUF_SIZE,
-					 seekable_fd_callback, client);
+	cmd->input = i_stream_create_seekable(inputs, POP3C_MAX_INBUF_SIZE,
+					      seekable_fd_callback, client);
+	i_stream_unref(&input);
 	i_stream_unref(&inputs[0]);
 
-	i_assert(client->io == NULL);
-	client->io = io_add(client->fd, IO_READ,
-			    pop3c_client_dot_input, client);
-	/* read any pending data from the stream */
-	pop3c_client_dot_input(client);
-	if (!client->dot_input->eof)
-		pop3c_client_run(client);
-
-	if (client->input == NULL) {
-		i_assert(client->io == NULL);
-		i_stream_destroy(&client->dot_input);
-		*error_r = "Disconnected";
-		return -1;
-	}
-	io_remove(&client->io);
-	i_stream_seek(client->dot_input, 0);
-	/* if this stream is used by some filter stream, make the filter
-	   stream blocking */
-	client->dot_input->blocking = TRUE;
-
-	*input_r = client->dot_input;
-	client->dot_input = NULL;
-	return 0;
+	i_stream_ref(cmd->input);
+	return cmd->input;
 }
--- a/src/lib-storage/index/pop3c/pop3c-client.h	Tue Jan 26 15:38:13 2016 +0200
+++ b/src/lib-storage/index/pop3c/pop3c-client.h	Tue Jan 26 15:40:09 2016 +0200
@@ -43,13 +43,13 @@
 
 typedef void pop3c_login_callback_t(enum pop3c_command_state state,
 				    const char *reply, void *context);
+typedef void pop3c_cmd_callback_t(enum pop3c_command_state state,
+				  const char *reply, void *context);
 
 struct pop3c_client *
 pop3c_client_init(const struct pop3c_client_settings *set);
 void pop3c_client_deinit(struct pop3c_client **client);
 
-void pop3c_client_run(struct pop3c_client *client);
-
 void pop3c_client_login(struct pop3c_client *client,
 			pop3c_login_callback_t *callback, void *context);
 
@@ -59,13 +59,25 @@
 
 /* Returns 0 if received +OK reply, reply contains the text without the +OK.
    Returns -1 if received -ERR reply or disconnected. */
-int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmd,
+int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmdline,
 			  const char **reply_r);
+/* Start the command asynchronously. Call the callback when finished. */
+struct pop3c_client_cmd *
+pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmdline,
+			    pop3c_cmd_callback_t *callback, void *context);
 /* Send a command, don't care if it succeeds or not. */
-void pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmd);
+void pop3c_client_cmd_line_async_nocb(struct pop3c_client *client,
+				      const char *cmdline);
 /* Returns 0 and stream if succeeded, -1 and error if received -ERR reply or
    disconnected. */
-int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmd,
+int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmdline,
 			    struct istream **input_r, const char **error_r);
+/* Start the command asynchronously. Call the callback when finished. */
+struct istream *
+pop3c_client_cmd_stream_async(struct pop3c_client *client, const char *cmdline,
+			      pop3c_cmd_callback_t *callback, void *context);
+/* Wait for the next async command to finish. It's an error to call this when
+   there are no pending async commands. */
+void pop3c_client_wait_one(struct pop3c_client *client);
 
 #endif
--- a/src/lib-storage/index/pop3c/pop3c-mail.c	Tue Jan 26 15:38:13 2016 +0200
+++ b/src/lib-storage/index/pop3c/pop3c-mail.c	Tue Jan 26 15:40:09 2016 +0200
@@ -151,6 +151,9 @@
 		if (get_body)
 			pop3c_mail_cache_size(mail);
 	}
+	/* if this stream is used by some filter stream, make the
+	   filter stream blocking */
+	mail->data.stream->blocking = TRUE;
 	return index_mail_init_stream(mail, hdr_size, body_size, stream_r);
 }
 
--- a/src/lib-storage/index/pop3c/pop3c-storage.c	Tue Jan 26 15:38:13 2016 +0200
+++ b/src/lib-storage/index/pop3c/pop3c-storage.c	Tue Jan 26 15:40:09 2016 +0200
@@ -176,7 +176,7 @@
 	mbox->client = pop3c_client_create_from_set(box->storage,
 						    mbox->storage->set);
 	pop3c_client_login(mbox->client, pop3c_login_callback, mbox);
-	pop3c_client_run(mbox->client);
+	pop3c_client_wait_one(mbox->client);
 	return mbox->logged_in ? 0 : -1;
 }
 
--- a/src/lib-storage/index/pop3c/pop3c-sync.c	Tue Jan 26 15:38:13 2016 +0200
+++ b/src/lib-storage/index/pop3c/pop3c-sync.c	Tue Jan 26 15:40:09 2016 +0200
@@ -319,7 +319,7 @@
 
 			str_truncate(str, 0);
 			str_printfa(str, "DELE %u\r\n", idx+1);
-			pop3c_client_cmd_line_async(mbox->client, str_c(str));
+			pop3c_client_cmd_line_async_nocb(mbox->client, str_c(str));
 			deletions = TRUE;
 		}
 	}