changeset 18254:22a5eda76490

doveadm: Added support for mail commands to read an input stream (from stdin) This is done by calling doveadm_mail_get_input() from the command's init() function. Currently it reads the entire input into a seekable istream with hardcoded 5 minute timeout. The input stream sending works also through doveadm proxying. This could probably be used by dsync at some point to support proxying over doveadm proxies, but that would require some more work. Especially a flag for commands to specify that they allow non-blocking input streams.
author Timo Sirainen <tss@iki.fi>
date Sun, 15 Feb 2015 10:09:19 +0200
parents ca24e6d34345
children d3bb7541ca5e
files src/doveadm/doveadm-dsync.c src/doveadm/doveadm-mail-server.c src/doveadm/doveadm-mail.c src/doveadm/doveadm-mail.h src/doveadm/server-connection.c src/doveadm/server-connection.h
diffstat 6 files changed, 166 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- a/src/doveadm/doveadm-dsync.c	Sun Feb 15 10:03:10 2015 +0200
+++ b/src/doveadm/doveadm-dsync.c	Sun Feb 15 10:09:19 2015 +0200
@@ -762,7 +762,7 @@
 	str_append_c(cmd, '\n');
 
 	ctx->tcp_conn = conn;
-	server_connection_cmd(conn, str_c(cmd),
+	server_connection_cmd(conn, str_c(cmd), NULL,
 			      dsync_connected_callback, ctx);
 	io_loop_run(ioloop);
 	ctx->tcp_conn = NULL;
--- a/src/doveadm/doveadm-mail-server.c	Sun Feb 15 10:03:10 2015 +0200
+++ b/src/doveadm/doveadm-mail-server.c	Sun Feb 15 10:09:19 2015 +0200
@@ -155,7 +155,7 @@
 	servercmd = i_new(struct doveadm_mail_server_cmd, 1);
 	servercmd->conn = conn;
 	servercmd->username = i_strdup(username);
-	server_connection_cmd(conn, str_c(cmd),
+	server_connection_cmd(conn, str_c(cmd), cmd_ctx->cmd_input,
 			      doveadm_cmd_callback, servercmd);
 }
 
--- a/src/doveadm/doveadm-mail.c	Sun Feb 15 10:03:10 2015 +0200
+++ b/src/doveadm/doveadm-mail.c	Sun Feb 15 10:09:19 2015 +0200
@@ -4,6 +4,9 @@
 #include "array.h"
 #include "lib-signals.h"
 #include "ioloop.h"
+#include "istream.h"
+#include "istream-dot.h"
+#include "istream-seekable.h"
 #include "str.h"
 #include "unichar.h"
 #include "module-dir.h"
@@ -17,6 +20,7 @@
 #include "mail-search-build.h"
 #include "mail-search-parser.h"
 #include "mailbox-list-iter.h"
+#include "client-connection.h"
 #include "doveadm.h"
 #include "doveadm-settings.h"
 #include "doveadm-print.h"
@@ -26,6 +30,8 @@
 #include <stdio.h>
 #include <stdlib.h>
 
+#define DOVEADM_MAIL_CMD_INPUT_TIMEOUT_MSECS (5*60*1000)
+
 ARRAY_TYPE(doveadm_mail_cmd) doveadm_mail_cmds;
 void (*hook_doveadm_mail_init)(struct doveadm_mail_cmd_context *ctx);
 struct doveadm_mail_cmd_module_register
@@ -144,6 +150,76 @@
 	return ctx;
 }
 
+static void doveadm_mail_cmd_input_input(struct doveadm_mail_cmd_context *ctx)
+{
+	while (i_stream_read(ctx->cmd_input) > 0)
+		i_stream_skip(ctx->cmd_input, i_stream_get_data_size(ctx->cmd_input));
+	if (!ctx->cmd_input->eof)
+		return;
+
+	if (ctx->cmd_input->stream_errno != 0) {
+		i_error("read(%s) failed: %s",
+			i_stream_get_name(ctx->cmd_input),
+			i_stream_get_error(ctx->cmd_input));
+	}
+	io_loop_stop(current_ioloop);
+}
+
+static void doveadm_mail_cmd_input_timeout(struct doveadm_mail_cmd_context *ctx)
+{
+	struct istream *input;
+
+	input = i_stream_create_error_str(ETIMEDOUT, "Timed out in %u secs",
+			DOVEADM_MAIL_CMD_INPUT_TIMEOUT_MSECS/1000);
+	i_stream_set_name(input, i_stream_get_name(ctx->cmd_input));
+	i_stream_destroy(&ctx->cmd_input);
+	ctx->cmd_input = input;
+	io_loop_stop(current_ioloop);
+}
+
+static void doveadm_mail_cmd_input_read(struct doveadm_mail_cmd_context *ctx)
+{
+	struct ioloop *ioloop;
+	struct io *io;
+	struct timeout *to;
+
+	ioloop = io_loop_create();
+	io = io_add(ctx->cmd_input_fd, IO_READ,
+		    doveadm_mail_cmd_input_input, ctx);
+	to = timeout_add(DOVEADM_MAIL_CMD_INPUT_TIMEOUT_MSECS,
+			 doveadm_mail_cmd_input_timeout, ctx);
+	io_loop_run(ioloop);
+	io_remove(&io);
+	timeout_remove(&to);
+	io_loop_destroy(&ioloop);
+
+	i_assert(ctx->cmd_input->eof);
+	i_stream_seek(ctx->cmd_input, 0);
+}
+
+void doveadm_mail_get_input(struct doveadm_mail_cmd_context *ctx)
+{
+	struct istream *inputs[2];
+
+	if (ctx->cmd_input != NULL)
+		return;
+
+	if (ctx->conn != NULL)
+		inputs[0] = i_stream_create_dot(ctx->conn->input, FALSE);
+	else {
+		inputs[0] = i_stream_create_fd(STDIN_FILENO, 1024*1024, FALSE);
+		i_stream_set_name(inputs[0], "stdin");
+	}
+	inputs[1] = NULL;
+	ctx->cmd_input_fd = i_stream_get_fd(inputs[0]);
+	ctx->cmd_input = i_stream_create_seekable_path(inputs, 1024*256,
+						       "/tmp/doveadm.");
+	i_stream_set_name(ctx->cmd_input, i_stream_get_name(inputs[0]));
+	i_stream_unref(&inputs[0]);
+
+	doveadm_mail_cmd_input_read(ctx);
+}
+
 struct mailbox *
 doveadm_mailbox_find(struct mail_user *user, const char *mailbox)
 {
@@ -322,6 +398,8 @@
 		return ret;
 	}
 
+	if (ctx->cmd_input != NULL)
+		i_stream_seek(ctx->cmd_input, 0);
 	if (ctx->v.run(ctx, ctx->cur_mail_user) < 0) {
 		i_assert(ctx->exit_code != 0);
 	}
@@ -551,6 +629,8 @@
 	/* service deinit unloads mail plugins, so do it late */
 	mail_storage_service_deinit(&ctx->storage_service);
 
+	if (ctx->cmd_input != NULL)
+		i_stream_unref(&ctx->cmd_input);
 	if (ctx->exit_code != 0)
 		doveadm_exit_code = ctx->exit_code;
 	pool_unref(&ctx->pool);
--- a/src/doveadm/doveadm-mail.h	Sun Feb 15 10:03:10 2015 +0200
+++ b/src/doveadm/doveadm-mail.h	Sun Feb 15 10:09:19 2015 +0200
@@ -81,6 +81,9 @@
 	struct mail_user *cur_mail_user;
 	struct doveadm_mail_cmd_vfuncs v;
 
+	struct istream *cmd_input;
+	int cmd_input_fd;
+
 	ARRAY(union doveadm_mail_cmd_module_context *) module_contexts;
 
 	/* if non-zero, exit with this code */
@@ -133,6 +136,10 @@
 			     const char **error_r);
 void doveadm_mail_server_flush(void);
 
+/* Request input stream to be read (from stdin). This must be called from
+   the command's init() function. */
+void doveadm_mail_get_input(struct doveadm_mail_cmd_context *ctx);
+
 struct mailbox *
 doveadm_mailbox_find(struct mail_user *user, const char *mailbox);
 int doveadm_mailbox_find_and_sync(struct mail_user *user, const char *mailbox,
--- a/src/doveadm/server-connection.c	Sun Feb 15 10:03:10 2015 +0200
+++ b/src/doveadm/server-connection.c	Sun Feb 15 10:09:19 2015 +0200
@@ -7,6 +7,7 @@
 #include "net.h"
 #include "istream.h"
 #include "ostream.h"
+#include "ostream-dot.h"
 #include "str.h"
 #include "strescape.h"
 #include "iostream-ssl.h"
@@ -42,6 +43,8 @@
 	struct ostream *output;
 	struct ssl_iostream *ssl_iostream;
 
+	struct istream *cmd_input;
+	struct ostream *cmd_output;
 	const char *delayed_cmd;
 	server_cmd_callback_t *callback;
 	void *context;
@@ -78,6 +81,58 @@
 	}
 }
 
+static int server_connection_send_cmd_input_more(struct server_connection *conn)
+{
+	off_t ret;
+
+	/* ostream-dot writes only up to max buffer size, so keep it non-zero */
+	o_stream_set_max_buffer_size(conn->cmd_output, IO_BLOCK_SIZE);
+	ret = o_stream_send_istream(conn->cmd_output, conn->cmd_input);
+	o_stream_set_max_buffer_size(conn->cmd_output, (size_t)-1);
+
+	if (ret >= 0 && i_stream_have_bytes_left(conn->cmd_input)) {
+		o_stream_set_flush_pending(conn->cmd_output, TRUE);
+		return 0;
+	}
+	if (conn->cmd_input->stream_errno != 0) {
+		i_error("read(%s) failed: %s",
+			i_stream_get_name(conn->cmd_input),
+			i_stream_get_error(conn->cmd_input));
+	} else if (conn->cmd_output->stream_errno != 0 ||
+		   o_stream_flush(conn->cmd_output) < 0) {
+		i_error("write(%s) failed: %s",
+			o_stream_get_name(conn->cmd_output),
+			o_stream_get_error(conn->cmd_output));
+	}
+
+	i_stream_destroy(&conn->cmd_input);
+	o_stream_destroy(&conn->cmd_output);
+	return ret < 0 ? -1 : 1;
+}
+
+static void server_connection_send_cmd_input(struct server_connection *conn)
+{
+	if (conn->cmd_input == NULL)
+		return;
+
+	conn->cmd_output = o_stream_create_dot(conn->output, TRUE);
+	(void)server_connection_send_cmd_input_more(conn);
+}
+
+static int server_connection_output(struct server_connection *conn)
+{
+	int ret;
+
+	o_stream_cork(conn->output);
+	ret = o_stream_flush(conn->output);
+	if (ret > 0 && conn->cmd_input != NULL && conn->delayed_cmd == NULL)
+		ret = server_connection_send_cmd_input_more(conn);
+	if (ret < 0)
+		server_connection_destroy(&conn);
+	o_stream_uncork(conn->output);
+	return ret;
+}
+
 static void
 server_connection_callback(struct server_connection *conn,
 			   int exit_code, const char *error)
@@ -171,6 +226,7 @@
 	if (conn->delayed_cmd != NULL) {
 		o_stream_nsend_str(conn->output, conn->delayed_cmd);
 		conn->delayed_cmd = NULL;
+		server_connection_send_cmd_input(conn);
 	}
 }
 
@@ -401,6 +457,10 @@
 	conn->io = io_add(conn->fd, IO_READ, server_connection_input, conn);
 	conn->input = i_stream_create_fd(conn->fd, MAX_INBUF_SIZE, FALSE);
 	conn->output = o_stream_create_fd(conn->fd, (size_t)-1, FALSE);
+	o_stream_set_flush_callback(conn->output, server_connection_output, conn);
+
+	i_stream_set_name(conn->input, server->name);
+	o_stream_set_name(conn->output, server->name);
 
 	array_append(&conn->server->connections, &conn, 1);
 
@@ -452,6 +512,11 @@
 		i_stream_destroy(&conn->input);
 	if (conn->output != NULL)
 		o_stream_destroy(&conn->output);
+	if (conn->cmd_input != NULL)
+		i_stream_destroy(&conn->cmd_input);
+	/* close cmd_output after its parent, so the "." isn't sent */
+	if (conn->cmd_output != NULL)
+		o_stream_destroy(&conn->cmd_output);
 	if (conn->ssl_iostream != NULL)
 		ssl_iostream_unref(&conn->ssl_iostream);
 	if (conn->io != NULL)
@@ -470,15 +535,23 @@
 }
 
 void server_connection_cmd(struct server_connection *conn, const char *line,
+			   struct istream *cmd_input,
 			   server_cmd_callback_t *callback, void *context)
 {
 	i_assert(conn->delayed_cmd == NULL);
 
 	conn->state = SERVER_REPLY_STATE_PRINT;
-	if (conn->authenticated)
+	if (cmd_input != NULL) {
+		i_assert(conn->cmd_input == NULL);
+		i_stream_ref(cmd_input);
+		conn->cmd_input = cmd_input;
+	}
+	if (!conn->authenticated)
+		conn->delayed_cmd = p_strdup(conn->pool, line);
+	else {
 		o_stream_nsend_str(conn->output, line);
-	else
-		conn->delayed_cmd = p_strdup(conn->pool, line);
+		server_connection_send_cmd_input(conn);
+	}
 	conn->callback = callback;
 	conn->context = context;
 }
--- a/src/doveadm/server-connection.h	Sun Feb 15 10:03:10 2015 +0200
+++ b/src/doveadm/server-connection.h	Sun Feb 15 10:09:19 2015 +0200
@@ -19,6 +19,7 @@
 server_connection_get_server(struct server_connection *conn);
 
 void server_connection_cmd(struct server_connection *conn, const char *line,
+			   struct istream *cmd_input,
 			   server_cmd_callback_t *callback, void *context);
 /* Returns TRUE if no command is being processed */
 bool server_connection_is_idle(struct server_connection *conn);