changeset 9900:22d27318bb18 HEAD

lmtp client, proxy: Several bugfixes.
author Timo Sirainen <tss@iki.fi>
date Tue, 08 Sep 2009 13:28:40 -0400
parents 6e5cc5d4cec4
children 987d244a7a3e
files src/lib-lda/lmtp-client.c src/lib-lda/lmtp-client.h src/lmtp/commands.c src/lmtp/lmtp-proxy.c src/lmtp/lmtp-proxy.h
diffstat 5 files changed, 124 insertions(+), 59 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib-lda/lmtp-client.c	Mon Sep 07 12:06:15 2009 -0400
+++ b/src/lib-lda/lmtp-client.c	Tue Sep 08 13:28:40 2009 -0400
@@ -53,6 +53,8 @@
 	unsigned int rcpt_next_send_idx;
 	struct istream *data_input;
 	unsigned char output_last;
+
+	unsigned int output_finished:1;
 };
 
 static void lmtp_client_send_rcpts(struct lmtp_client *client);
@@ -124,36 +126,52 @@
 static bool
 lmtp_client_rcpt_next(struct lmtp_client *client, const char *line)
 {
-	struct lmtp_rcpt *recipients;
-	unsigned int i, count;
+	struct lmtp_rcpt *rcpt;
 	bool success, all_sent;
 
 	success = line[0] == '2';
 
-	recipients = array_get_modifiable(&client->recipients, &count);
-	for (i = client->rcpt_next_receive_idx; i < count; i++) {
-		recipients[i].failed = !success;
-		recipients[i].rcpt_to_callback(success, line,
-					       recipients[i].context);
-	}
-	all_sent = i == client->rcpt_next_receive_idx;
-	client->rcpt_next_receive_idx = i;
+	rcpt = array_idx_modifiable(&client->recipients,
+				    client->rcpt_next_receive_idx);
+	rcpt->failed = !success;
+	rcpt->rcpt_to_callback(success, line, rcpt->context);
+
+	all_sent = ++client->rcpt_next_receive_idx ==
+		array_count(&client->recipients);
 	return all_sent && client->data_input != NULL;
 }
 
-static int
+static bool
 lmtp_client_data_next(struct lmtp_client *client, const char *line)
 {
 	struct lmtp_rcpt *rcpt;
+	unsigned int i, count;
 	bool last;
 
-	rcpt = array_idx_modifiable(&client->recipients,
-				    client->rcpt_next_data_idx);
-	rcpt->failed = line[0] != '2';
-	last = ++client->rcpt_next_data_idx == array_count(&client->recipients);
+	switch (client->protocol) {
+	case LMTP_CLIENT_PROTOCOL_SMTP:
+		i_assert(client->rcpt_next_data_idx == 0);
 
-	rcpt->data_callback(!rcpt->failed, line, rcpt->context);
-	return last ? -1 : 0;
+		rcpt = array_get_modifiable(&client->recipients, &count);
+		for (i = 0; i < count; i++) {
+			rcpt[i].failed = line[0] != '2';
+			rcpt[i].data_callback(!rcpt->failed, line,
+					      rcpt[i].context);
+		}
+		client->rcpt_next_data_idx = count;
+		last = TRUE;
+		break;
+	case LMTP_CLIENT_PROTOCOL_LMTP:
+		rcpt = array_idx_modifiable(&client->recipients,
+					    client->rcpt_next_data_idx);
+		rcpt->failed = line[0] != '2';
+		last = ++client->rcpt_next_data_idx ==
+			array_count(&client->recipients);
+
+		rcpt->data_callback(!rcpt->failed, line, rcpt->context);
+		break;
+	}
+	return !last;
 }
 
 static void lmtp_client_send_data(struct lmtp_client *client)
@@ -163,6 +181,9 @@
 	size_t i, size;
 	int ret;
 
+	if (client->output_finished)
+		return;
+
 	while ((ret = i_stream_read_data(client->data_input,
 					 &data, &size, 0)) > 0) {
 		add = '\0';
@@ -216,6 +237,7 @@
 		(void)o_stream_send(client->output, "\r\n", 2);
 	}
 	(void)o_stream_send(client->output, ".\r\n", 3);
+	client->output_finished = TRUE;
 }
 
 static void lmtp_client_send_handshake(struct lmtp_client *client)
@@ -289,8 +311,9 @@
 	case LMTP_INPUT_STATE_RCPT_TO:
 		if (!lmtp_client_rcpt_next(client, line))
 			break;
-		/* fall through */
 		client->input_state++;
+		o_stream_send_str(client->output, "DATA\r\n");
+		break;
 	case LMTP_INPUT_STATE_DATA_CONTINUE:
 		/* Start sending DATA */
 		if (strncmp(line, "354", 3) != 0) {
@@ -304,7 +327,7 @@
 		break;
 	case LMTP_INPUT_STATE_DATA:
 		/* DATA replies */
-		if (lmtp_client_data_next(client, line) < 0)
+		if (!lmtp_client_data_next(client, line))
 			return -1;
 		break;
 	}
@@ -334,7 +357,8 @@
 	if (err != 0) {
 		i_error("lmtp client: connect(%s, %u) failed: %s",
 			client->host, client->port, strerror(err));
-		lmtp_client_fail(client, NULL);
+		lmtp_client_fail(client, ERRSTR_TEMP_REMOTE_FAILURE
+				 " (connect)");
 		return;
 	}
 	io_remove(&client->io);
@@ -348,7 +372,8 @@
 
 	o_stream_cork(client->output);
 	if ((ret = o_stream_flush(client->output)) < 0)
-		lmtp_client_fail(client, NULL);
+		lmtp_client_fail(client, ERRSTR_TEMP_REMOTE_FAILURE
+				 " (disconnected in output)");
 	else if (client->input_state == LMTP_INPUT_STATE_DATA)
 		lmtp_client_send_data(client);
 	o_stream_uncork(client->output);
@@ -415,8 +440,13 @@
 
 void lmtp_client_send(struct lmtp_client *client, struct istream *data_input)
 {
+	i_stream_ref(data_input);
 	client->data_input = data_input;
-	o_stream_send_str(client->output, "DATA\r\n");
+
+	if (client->rcpt_next_receive_idx == array_count(&client->recipients)) {
+		client->input_state++;
+		o_stream_send_str(client->output, "DATA\r\n");
+	}
 }
 
 void lmtp_client_send_more(struct lmtp_client *client)
--- a/src/lib-lda/lmtp-client.h	Mon Sep 07 12:06:15 2009 -0400
+++ b/src/lib-lda/lmtp-client.h	Tue Sep 08 13:28:40 2009 -0400
@@ -1,6 +1,8 @@
 #ifndef LMTP_CLIENT_H
 #define LMTP_CLIENT_H
 
+#define ERRSTR_TEMP_REMOTE_FAILURE "451 4.4.0 Remote server not answering"
+
 /* LMTP/SMTP client code. */
 
 enum lmtp_client_protocol {
--- a/src/lmtp/commands.c	Mon Sep 07 12:06:15 2009 -0400
+++ b/src/lmtp/commands.c	Tue Sep 08 13:28:40 2009 -0400
@@ -24,6 +24,8 @@
 #define ERRSTR_TEMP_MAILBOX_FAIL "451 4.3.0 <%s> Temporary internal error"
 #define ERRSTR_TEMP_USERDB_FAIL "451 4.3.0 <%s> Temporary user lookup failure"
 
+#define LMTP_PROXY_DEFAULT_TIMEOUT_MSECS (1000*30)
+
 int cmd_lhlo(struct client *client, const char *args ATTR_UNUSED)
 {
 	client_state_reset(client);
@@ -94,7 +96,7 @@
 			       const char *const *args, const char **address)
 {
 	const char *p, *key, *value;
-	bool proxying = FALSE;
+	bool proxying = FALSE, port_set = FALSE;
 
 	for (; *args != NULL; args++) {
 		p = strchr(*args, '=');
@@ -110,9 +112,23 @@
 			proxying = TRUE;
 		else if (strcmp(key, "host") == 0)
 			set->host = value;
-		else if (strcmp(key, "port") == 0)
+		else if (strcmp(key, "port") == 0) {
 			set->port = atoi(value);
-		else if (strcmp(key, "user") == 0) {
+			port_set = TRUE;
+		} else if (strcmp(key, "proxy_timeout") == 0)
+			set->timeout_msecs = atoi(value)*1000;
+		else if (strcmp(key, "protocol") == 0) {
+			if (strcmp(value, "lmtp") == 0)
+				set->protocol = LMTP_CLIENT_PROTOCOL_LMTP;
+			else if (strcmp(value, "smtp") == 0) {
+				set->protocol = LMTP_CLIENT_PROTOCOL_SMTP;
+				if (!port_set)
+					set->port = 25;
+			} else {
+				i_error("proxy: Unknown protocol %s", value);
+				return FALSE;
+			}
+		} else if (strcmp(key, "user") == 0) {
 			/* changing the username */
 			*address = value;
 		} else {
@@ -176,6 +192,9 @@
 
 	memset(&set, 0, sizeof(set));
 	set.port = client->local_port;
+	set.protocol = LMTP_CLIENT_PROTOCOL_LMTP;
+	set.timeout_msecs = LMTP_PROXY_DEFAULT_TIMEOUT_MSECS;
+
 	if (!client_proxy_rcpt_parse_fields(&set, fields, &address)) {
 		/* not proxying this user */
 		pool_unref(&pool);
--- a/src/lmtp/lmtp-proxy.c	Mon Sep 07 12:06:15 2009 -0400
+++ b/src/lmtp/lmtp-proxy.c	Tue Sep 08 13:28:40 2009 -0400
@@ -10,7 +10,6 @@
 #include "lmtp-proxy.h"
 
 #define LMTP_MAX_LINE_LEN 1024
-#define LMTP_PROXY_OUTPUT_TIMEOUT_MSECS 1000
 
 struct lmtp_proxy_recipient {
 	struct lmtp_proxy_connection *conn;
@@ -47,13 +46,16 @@
 	struct ostream *client_output;
 	struct tee_istream *tee_data_input;
 
+	unsigned int max_timeout_msecs;
+
 	void (*finish_callback)(void *);
 	void *finish_context;
 
 	unsigned int finished:1;
 };
 
-static void lmtp_proxy_conn_deinit(struct lmtp_proxy_connection *conn);
+static void lmtp_proxy_conn_deinit(struct lmtp_proxy_connection *conn,
+				   const char *reason);
 static void lmtp_proxy_data_input(struct lmtp_proxy *proxy);
 
 struct lmtp_proxy *
@@ -81,7 +83,7 @@
 
 	conns = array_get(&proxy->connections, &count);
 	for (i = 0; i < count; i++)
-		lmtp_proxy_conn_deinit(conns[i]);
+		lmtp_proxy_conn_deinit(conns[i], "451 4.3.0 Aborting");
 }
 
 void lmtp_proxy_deinit(struct lmtp_proxy **_proxy)
@@ -116,6 +118,8 @@
 	struct lmtp_proxy_connection *const *conns, *conn;
 	unsigned int i, count;
 
+	i_assert(set->timeout_msecs > 0);
+
 	conns = array_get(&proxy->connections, &count);
 	for (i = 0; i < count; i++) {
 		if (conns[i]->set.port == set->port &&
@@ -130,13 +134,17 @@
 	conn->set.timeout_msecs = set->timeout_msecs;
 	array_append(&proxy->connections, &conn, 1);
 	conn->client = lmtp_client_init(proxy->mail_from, proxy->my_hostname);
-	if (lmtp_client_connect_tcp(conn->client, LMTP_CLIENT_PROTOCOL_LMTP,
+	if (lmtp_client_connect_tcp(conn->client, set->protocol,
 				    conn->set.host, conn->set.port) < 0)
 		conn->failed = TRUE;
+
+	if (proxy->max_timeout_msecs < set->timeout_msecs)
+		proxy->max_timeout_msecs = set->timeout_msecs;
 	return conn;
 }
 
-static void lmtp_proxy_conn_deinit(struct lmtp_proxy_connection *conn)
+static void lmtp_proxy_conn_deinit(struct lmtp_proxy_connection *conn,
+				   const char *reason)
 {
 	struct lmtp_proxy_recipient *rcpt;
 	unsigned int i, count;
@@ -145,7 +153,7 @@
 	rcpt = array_get_modifiable(&conn->proxy->rcpt_to, &count);
 	for (i = 0; i < count; i++) {
 		if (rcpt[i].conn == conn && !rcpt[i].rcpt_to_failed)
-			rcpt[i].reply = ERRSTR_TEMP_REMOTE_FAILURE;
+			rcpt[i].reply = reason;
 	}
 
 	if (conn->client != NULL)
@@ -176,25 +184,8 @@
 
 static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
 {
-	struct lmtp_proxy_recipient *rcpt;
-	unsigned int i, count;
-	bool ret;
-
 	i_assert(!proxy->finished);
 
-	/* if we haven't sent something yet, they're failures */
-	rcpt = array_get_modifiable(&proxy->rcpt_to, &count);
-	for (i = proxy->rcpt_next_reply_idx; i < count; i++) {
-		if (!rcpt[i].rcpt_to_failed) {
-			i_assert(!rcpt[i].data_reply_received);
-			rcpt[i].reply = ERRSTR_TEMP_REMOTE_FAILURE;
-			rcpt[i].data_reply_received = TRUE;
-		}
-	}
-
-	ret = lmtp_proxy_send_replies(proxy);
-	i_assert(ret);
-
 	proxy->finished = TRUE;
 	proxy->finish_callback(proxy->finish_context);
 }
@@ -205,6 +196,26 @@
 		lmtp_proxy_finish(proxy);
 }
 
+static void lmtp_proxy_data_disconnected(struct lmtp_proxy *proxy)
+{
+	struct lmtp_proxy_recipient *rcpt;
+	unsigned int i, count;
+	bool ret;
+
+	rcpt = array_get_modifiable(&proxy->rcpt_to, &count);
+	for (i = proxy->rcpt_next_reply_idx; i < count; i++) {
+		if (!rcpt[i].rcpt_to_failed) {
+			i_assert(!rcpt[i].data_reply_received);
+			rcpt[i].reply = "451 4.4.0 Client disconnected in DATA";
+			rcpt[i].data_reply_received = TRUE;
+		}
+	}
+	ret = lmtp_proxy_send_replies(proxy);
+	i_assert(ret);
+
+	lmtp_proxy_finish(proxy);
+}
+
 static void
 lmtp_proxy_conn_rcpt_to(bool success, const char *reply, void *context)
 {
@@ -238,9 +249,6 @@
 
 	i_assert(conn->proxy->data_input != NULL);
 
-	if (reply == NULL)
-		reply = ERRSTR_TEMP_REMOTE_FAILURE;
-
 	rcpt = array_get_modifiable(&conn->proxy->rcpt_to, &count);
 	for (i = conn->data_next_reply_low_idx; i < count; i++) {
 		if (rcpt[i].conn == conn && !rcpt[i].rcpt_to_failed) {
@@ -293,7 +301,8 @@
 	}
 	i_assert(max_conn != NULL);
 
-	lmtp_proxy_conn_deinit(max_conn);
+	lmtp_proxy_conn_deinit(max_conn, ERRSTR_TEMP_REMOTE_FAILURE
+			       " (timeout)");
 }
 
 static void lmtp_proxy_wait_for_output(struct lmtp_proxy *proxy)
@@ -303,12 +312,12 @@
 	if (proxy->io != NULL)
 		io_remove(&proxy->io);
 	if (array_count(&proxy->connections) > 1) {
-		proxy->to = timeout_add(LMTP_PROXY_OUTPUT_TIMEOUT_MSECS,
+		proxy->to = timeout_add(proxy->max_timeout_msecs,
 					lmtp_proxy_output_timeout, proxy);
 	}
 }
 
-static bool lmtp_proxy_read_data(struct lmtp_proxy *proxy)
+static bool lmtp_proxy_data_read(struct lmtp_proxy *proxy)
 {
 	size_t size;
 
@@ -318,8 +327,12 @@
 		lmtp_proxy_wait_for_output(proxy);
 		return FALSE;
 	case -1:
-		/* disconnected */
-		lmtp_proxy_finish(proxy);
+		if (proxy->data_input->stream_errno != 0)
+			lmtp_proxy_data_disconnected(proxy);
+		else {
+			/* finished reading data input. now we'll just have to
+			   wait for replies. */
+		}
 		return FALSE;
 	case 0:
 		/* nothing new read */
@@ -346,7 +359,7 @@
 		conns = array_get(&proxy->connections, &count);
 		for (i = 0; i < count; i++)
 			lmtp_client_send_more(conns[i]->client);
-	} while (lmtp_proxy_read_data(proxy));
+	} while (lmtp_proxy_data_read(proxy));
 }
 
 void lmtp_proxy_start(struct lmtp_proxy *proxy, struct istream *data_input,
@@ -364,6 +377,7 @@
 	for (i = 0; i < count; i++) {
 		conns[i]->data_input =
 			tee_i_stream_create_child(proxy->tee_data_input);
+		lmtp_client_send(conns[i]->client, conns[i]->data_input);
 	}
 
 	lmtp_proxy_data_input(proxy);
--- a/src/lmtp/lmtp-proxy.h	Mon Sep 07 12:06:15 2009 -0400
+++ b/src/lmtp/lmtp-proxy.h	Tue Sep 08 13:28:40 2009 -0400
@@ -2,13 +2,13 @@
 #define LMTP_PROXY_H
 
 #include "network.h"
-
-#define ERRSTR_TEMP_REMOTE_FAILURE "451 4.4.0 Remote server not answering"
+#include "lmtp-client.h"
 
 struct lmtp_proxy_settings {
 	const char *host;
 	unsigned int port;
 	unsigned int timeout_msecs;
+	enum lmtp_client_protocol protocol;
 };
 
 struct lmtp_proxy *