Mercurial > dovecot > core-2.2
changeset 18297:0e657cfb3f98
lmtp: Added lmtp_user_concurrency_limit setting.
This allows limiting the number of concurrent deliveries to a single user.
Useful in case one user is receiving a lot of mails and causing other
deliveries to be delayed. If the limit is reached, Dovecot returns temporary
failure error at DATA stage.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Fri, 06 Mar 2015 02:10:02 +0200 |
parents | c1403c1eb3e9 |
children | 03931c3b43b5 |
files | src/lmtp/client.c src/lmtp/client.h src/lmtp/commands.c src/lmtp/lmtp-settings.c src/lmtp/lmtp-settings.h src/lmtp/main.c src/lmtp/main.h |
diffstat | 7 files changed, 89 insertions(+), 7 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lmtp/client.c Fri Mar 06 02:06:10 2015 +0200 +++ b/src/lmtp/client.c Fri Mar 06 02:10:02 2015 +0200 @@ -11,6 +11,7 @@ #include "process-title.h" #include "var-expand.h" #include "settings-parser.h" +#include "anvil-client.h" #include "master-service.h" #include "master-service-ssl.h" #include "master-service-settings.h" @@ -355,8 +356,11 @@ lmtp_proxy_deinit(&client->proxy); if (array_is_created(&client->state.rcpt_to)) { - array_foreach_modifiable(&client->state.rcpt_to, rcptp) + array_foreach_modifiable(&client->state.rcpt_to, rcptp) { + if ((*rcptp)->anvil_query != NULL) + anvil_client_query_abort(anvil, &(*rcptp)->anvil_query); mail_storage_service_user_free(&(*rcptp)->service_user); + } } if (client->state.raw_mail != NULL) {
--- a/src/lmtp/client.h Fri Mar 06 02:06:10 2015 +0200 +++ b/src/lmtp/client.h Fri Mar 06 02:10:02 2015 +0200 @@ -7,11 +7,15 @@ #define CLIENT_MAIL_DATA_MAX_INMEMORY_SIZE (1024*128) struct mail_recipient { + struct client *client; + const char *address; const char *detail; /* +detail part is also in address */ struct lmtp_recipient_params params; + struct anvil_query *anvil_query; struct mail_storage_service_user *service_user; + unsigned int parallel_count; }; struct client_state { @@ -21,6 +25,9 @@ ARRAY(struct mail_recipient *) rcpt_to; unsigned int rcpt_idx; + unsigned int anvil_queries; + bool anvil_pending_data_write; + unsigned int data_end_idx; /* Initially we start writing to mail_data. If it grows too large,
--- a/src/lmtp/commands.c Fri Mar 06 02:06:10 2015 +0200 +++ b/src/lmtp/commands.c Fri Mar 06 02:10:02 2015 +0200 @@ -15,6 +15,7 @@ #include "var-expand.h" #include "restrict-access.h" #include "settings-parser.h" +#include "anvil-client.h" #include "master-service.h" #include "master-service-ssl.h" #include "iostream-ssl.h" @@ -41,6 +42,8 @@ #define LMTP_PROXY_DEFAULT_TIMEOUT_MSECS (1000*30) +static void client_input_data_write(struct client *client); + int cmd_lhlo(struct client *client, const char *args) { struct rfc822_parser_context parser; @@ -563,6 +566,34 @@ return ret; } +static void rcpt_anvil_lookup_callback(const char *reply, void *context) +{ + struct mail_recipient *rcpt = context; + + i_assert(rcpt->client->state.anvil_queries > 0); + + rcpt->anvil_query = NULL; + if (reply == NULL) { + /* lookup failed */ + } else if (str_to_uint(reply, &rcpt->parallel_count) < 0) { + i_error("Invalid reply from anvil: %s", reply); + } + if (--rcpt->client->state.anvil_queries == 0 && + rcpt->client->state.anvil_pending_data_write) { + /* DATA command was finished, but we were still waiting on + anvil before handling any users */ + client_input_data_write(rcpt->client); + } +} + +static void lmtp_anvil_init(void) +{ + if (anvil == NULL) { + const char *path = t_strdup_printf("%s/anvil", base_dir); + anvil = anvil_client_init(path, NULL, 0); + } +} + int cmd_rcpt(struct client *client, const char *args) { struct mail_recipient *rcpt; @@ -584,6 +615,7 @@ } rcpt = p_new(client->state_pool, struct mail_recipient, 1); + rcpt->client = client; address = lmtp_unescape_address(address); argv = t_strsplit(params, " "); @@ -652,6 +684,17 @@ array_append(&client->state.rcpt_to, &rcpt, 1); client_send_line(client, "250 2.1.5 OK"); } + + if (client->lmtp_set->lmtp_user_concurrency_limit > 0) { + const char *query = t_strconcat("LOOKUP\t", + master_service_get_name(master_service), + "/", str_tabescape(username), NULL); + lmtp_anvil_init(); + rcpt->anvil_query = anvil_client_query(anvil, query, + rcpt_anvil_lookup_callback, rcpt); + if (rcpt->anvil_query != NULL) + client->state.anvil_queries++; + } return 0; } @@ -710,9 +753,18 @@ enum mail_error mail_error; int ret; + i_assert(client->state.anvil_queries == 0); + input = mail_storage_service_user_get_input(rcpt->service_user); username = t_strdup(input->username); + if (rcpt->parallel_count >= client->lmtp_set->lmtp_user_concurrency_limit) { + client_send_line(client, ERRSTR_TEMP_USERDB_FAIL_PREFIX + "Too many concurrent deliveries for user", + rcpt->address); + return -1; + } + mail_set = mail_storage_service_user_get_mail_set(rcpt->service_user); set_parser = mail_storage_service_user_get_settings_parser(rcpt->service_user); if (client->proxy_timeout_secs > 0 && @@ -776,6 +828,11 @@ dctx.save_dest_mail = array_count(&client->state.rcpt_to) > 1 && client->state.first_saved_mail == NULL; + if (client->lmtp_set->lmtp_user_concurrency_limit > 0) { + master_service_anvil_send(master_service, t_strconcat( + "CONNECT\t", my_pid, "\t", master_service_get_name(master_service), + "/", username, "\n", NULL)); + } if (mail_deliver(&dctx, &storage) == 0) { if (dctx.dest_mail != NULL) { i_assert(client->state.first_saved_mail == NULL); @@ -807,6 +864,11 @@ rcpt->address); ret = -1; } + if (client->lmtp_set->lmtp_user_concurrency_limit > 0) { + master_service_anvil_send(master_service, t_strconcat( + "DISCONNECT\t", my_pid, "\t", master_service_get_name(master_service), + "/", username, "\n", NULL)); + } return ret; } @@ -1019,10 +1081,9 @@ return str_c(str); } -static bool client_input_data_write(struct client *client) +static void client_input_data_write(struct client *client) { struct istream *input; - bool ret = TRUE; /* stop handling client input until saving/proxying is finished */ if (client->to_idle != NULL) @@ -1037,10 +1098,10 @@ client_state_set(client, "DATA", "proxying"); lmtp_proxy_start(client->proxy, input, client_proxy_finish, client); - ret = FALSE; + } else { + client_input_data_finish(client); } i_stream_unref(&input); - return ret; } static int client_input_add_file(struct client *client, @@ -1127,8 +1188,10 @@ return; } - if (client_input_data_write(client)) - client_input_data_finish(client); + if (client->state.anvil_queries == 0) + client_input_data_write(client); + else + client->state.anvil_pending_data_write = TRUE; } static void client_input_data(struct client *client)
--- a/src/lmtp/lmtp-settings.c Fri Mar 06 02:06:10 2015 +0200 +++ b/src/lmtp/lmtp-settings.c Fri Mar 06 02:10:02 2015 +0200 @@ -60,6 +60,7 @@ DEF(SET_BOOL, lmtp_proxy), DEF(SET_BOOL, lmtp_save_to_detail_mailbox), DEF(SET_BOOL, lmtp_rcpt_check_quota), + DEF(SET_UINT, lmtp_user_concurrency_limit), DEF(SET_STR, lmtp_address_translate), DEF(SET_STR_VARS, login_greeting), DEF(SET_STR, login_trusted_networks), @@ -71,6 +72,7 @@ .lmtp_proxy = FALSE, .lmtp_save_to_detail_mailbox = FALSE, .lmtp_rcpt_check_quota = FALSE, + .lmtp_user_concurrency_limit = 0, .lmtp_address_translate = "", .login_greeting = PACKAGE_NAME" ready.", .login_trusted_networks = ""
--- a/src/lmtp/lmtp-settings.h Fri Mar 06 02:06:10 2015 +0200 +++ b/src/lmtp/lmtp-settings.h Fri Mar 06 02:10:02 2015 +0200 @@ -8,6 +8,7 @@ bool lmtp_proxy; bool lmtp_save_to_detail_mailbox; bool lmtp_rcpt_check_quota; + unsigned int lmtp_user_concurrency_limit; const char *lmtp_address_translate; const char *login_greeting; const char *login_trusted_networks;
--- a/src/lmtp/main.c Fri Mar 06 02:06:10 2015 +0200 +++ b/src/lmtp/main.c Fri Mar 06 02:10:02 2015 +0200 @@ -7,6 +7,7 @@ #include "abspath.h" #include "restrict-access.h" #include "fd-close-on-exec.h" +#include "anvil-client.h" #include "master-service.h" #include "master-service-settings.h" #include "master-interface.h" @@ -27,6 +28,7 @@ const char *dns_client_socket_path, *base_dir; struct mail_storage_service_ctx *storage_service; +struct anvil_client *anvil; static void client_connected(struct master_service_connection *conn) { @@ -69,6 +71,8 @@ static void main_deinit(void) { clients_destroy(); + if (anvil != NULL) + anvil_client_deinit(&anvil); } int main(int argc, char *argv[])
--- a/src/lmtp/main.h Fri Mar 06 02:06:10 2015 +0200 +++ b/src/lmtp/main.h Fri Mar 06 02:10:02 2015 +0200 @@ -3,6 +3,7 @@ extern const char *dns_client_socket_path, *base_dir; extern struct mail_storage_service_ctx *storage_service; +extern struct anvil_client *anvil; void listener_client_destroyed(void);