changeset 20940:8de947fa3b4d

director: Support flush socket This allows specifying an URI to execute on user kill. It can be of form exec:/path/to/bin, unix:/path/to/socket or tcp:ip:port The location is sent FLUSH username-hash per killed user. You can execute some action there, and you are expected to return '+\nOK\n' as reply once you are done.
author Aki Tuomi <aki.tuomi@dovecot.fi>
date Thu, 13 Oct 2016 16:11:48 +0300
parents 153d870c27ab
children 868cf176e7ff
files src/director/Makefile.am src/director/director-settings.c src/director/director-settings.h src/director/director.c
diffstat 4 files changed, 123 insertions(+), 7 deletions(-) [+]
line wrap: on
line diff
--- a/src/director/Makefile.am	Sat Oct 22 12:58:40 2016 +0300
+++ b/src/director/Makefile.am	Thu Oct 13 16:11:48 2016 +0300
@@ -9,7 +9,8 @@
 	-I$(top_srcdir)/src/lib-imap \
 	-I$(top_srcdir)/src/lib-settings \
 	-I$(top_srcdir)/src/lib-master \
-	-I$(top_srcdir)/src/lib-mail
+	-I$(top_srcdir)/src/lib-mail \
+	-I$(top_srcdir)/src/lib-program-client
 
 director_LDADD = $(LIBDOVECOT)
 director_DEPENDENCIES = $(LIBDOVECOT_DEPS)
--- a/src/director/director-settings.c	Sat Oct 22 12:58:40 2016 +0300
+++ b/src/director/director-settings.c	Thu Oct 13 16:11:48 2016 +0300
@@ -71,6 +71,7 @@
 	DEF(SET_STR, director_servers),
 	DEF(SET_STR, director_mail_servers),
 	DEF(SET_STR, director_username_hash),
+	DEF(SET_STR, director_flush_socket),
 	DEF(SET_TIME, director_user_expire),
 	DEF(SET_TIME, director_user_kick_delay),
 	DEF(SET_IN_PORT, director_doveadm_port),
@@ -85,6 +86,7 @@
 	.director_servers = "",
 	.director_mail_servers = "",
 	.director_username_hash = "%Lu",
+	.director_flush_socket = "",
 	.director_user_expire = 60*15,
 	.director_user_kick_delay = 2,
 	.director_doveadm_port = 0
--- a/src/director/director-settings.h	Sat Oct 22 12:58:40 2016 +0300
+++ b/src/director/director-settings.h	Thu Oct 13 16:11:48 2016 +0300
@@ -9,6 +9,8 @@
 	const char *director_servers;
 	const char *director_mail_servers;
 	const char *director_username_hash;
+	const char *director_flush_socket;
+
 	unsigned int director_user_expire;
 	unsigned int director_user_kick_delay;
 	in_port_t director_doveadm_port;
--- a/src/director/director.c	Sat Oct 22 12:58:40 2016 +0300
+++ b/src/director/director.c	Thu Oct 13 16:11:48 2016 +0300
@@ -7,6 +7,11 @@
 #include "strescape.h"
 #include "log-throttle.h"
 #include "ipc-client.h"
+#include "program-client.h"
+#include "var-expand.h"
+#include "istream.h"
+#include "ostream.h"
+#include "iostream-temp.h"
 #include "user-directory.h"
 #include "mail-host.h"
 #include "director-host.h"
@@ -14,7 +19,6 @@
 #include "director.h"
 
 #define DIRECTOR_IPC_PROXY_PATH "ipc"
-
 #define DIRECTOR_RECONNECT_RETRY_SECS 60
 #define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000)
 #define DIRECTOR_USER_MOVE_TIMEOUT_MSECS (30*1000)
@@ -33,6 +37,10 @@
 	.unthrottle_at_max_per_interval = 2,
 };
 
+static void
+director_user_kill_finish_delayed(struct director *dir, struct user *user,
+				  bool skip_delay);
+
 static bool director_is_self_ip_set(struct director *dir)
 {
 	struct ip_addr ip;
@@ -694,32 +702,135 @@
 
 struct director_user_kill_finish_ctx {
 	struct director *dir;
+	unsigned int username_hash;
 	struct user *user;
+	struct program_client *pclient;
+	struct ostream *reply;
+	char *socket_path;
 };
 
 static void
+director_flush_user_continue(int result,
+			     struct director_user_kill_finish_ctx *ctx)
+{
+	struct user *user =
+		user_directory_lookup(ctx->dir->users, ctx->username_hash);
+
+	if (user != NULL)
+		director_user_kill_finish_delayed(ctx->dir, user,
+						  result == 1);
+	if (result == 0) {
+		struct istream *is = iostream_temp_finish(&ctx->reply, (size_t)-1);
+		char *data;
+		i_stream_set_return_partial_line(is, TRUE);
+		data = i_stream_read_next_line(is);
+		i_error("%s: Failed to flush user hash %u in host %s: %s",
+			ctx->socket_path,
+			user->username_hash,
+			net_ip2addr(&user->host->ip),
+			data == NULL ? "(no output to stdout)" : data);
+		while((data = i_stream_read_next_line(is)) != NULL) {
+			i_error("%s: Failed to flush user hash %u in host %s: %s",
+				ctx->socket_path,
+				user->username_hash,
+				net_ip2addr(&user->host->ip),
+				data);
+		}
+		i_stream_unref(&is);
+	} else {
+		o_stream_unref(&ctx->reply);
+	}
+	program_client_destroy(&ctx->pclient);
+	i_free(ctx->socket_path);
+	i_free(ctx);
+}
+
+static void
+director_flush_user(struct director *dir, struct user *user)
+{
+	struct var_expand_table tab[] = {
+		{ 'i', net_ip2addr(&user->host->ip), "ip" },
+		{ 'h', user->host->hostname, "host" },
+		{ '\0', NULL, NULL }
+	};
+
+	/* execute flush script, if set */
+	if (*dir->set->director_flush_socket == '\0') {
+		director_user_kill_finish_delayed(dir, user, FALSE);
+		return;
+	}
+
+	struct director_user_kill_finish_ctx *ctx =
+		i_new(struct director_user_kill_finish_ctx, 1);
+	ctx->username_hash = user->username_hash;
+	ctx->dir = dir;
+
+	string_t *s_sock = str_new(default_pool, 32);
+	var_expand(s_sock, dir->set->director_flush_socket, tab);
+	ctx->socket_path = str_free_without_data(&s_sock);
+
+	const char *error;
+	struct program_client_settings set = {
+		.client_connect_timeout_msecs = 10000,
+	};
+
+	restrict_access_init(&set.restrict_set);
+
+	const char *const args[] = {"FLUSH",
+		t_strdup_printf("%u", user->username_hash), NULL};
+
+	if ((program_client_create(ctx->socket_path, args, &set, FALSE,
+				   &ctx->pclient, &error)) != 0) {
+		i_error("%s: Failed to flush user hash %u in host %s: %s",
+			ctx->socket_path,
+			user->username_hash,
+			net_ip2addr(&user->host->ip),
+			error);
+		director_flush_user_continue(0, ctx);
+		return;
+	}
+
+	ctx->reply =
+		iostream_temp_create_named("/tmp", 0,
+					   t_strdup_printf("flush response from %s",
+							   net_ip2addr(&user->host->ip)));
+	o_stream_set_no_error_handling(ctx->reply, TRUE);
+	program_client_set_output(ctx->pclient, ctx->reply);
+	program_client_run_async(ctx->pclient, director_flush_user_continue, ctx);
+}
+
+static void
 director_user_kill_finish_delayed_to(struct director_user_kill_finish_ctx *ctx)
 {
 	i_assert(ctx->user->kill_state == USER_KILL_STATE_DELAY);
 
 	ctx->user->kill_state = USER_KILL_STATE_NONE;
-	timeout_remove(&ctx->user->to_move);
+	if (ctx->user->to_move != NULL)
+		timeout_remove(&ctx->user->to_move);
 
 	ctx->dir->state_change_callback(ctx->dir);
 	i_free(ctx);
 }
 
 static void
-director_user_kill_finish_delayed(struct director *dir, struct user *user)
+director_user_kill_finish_delayed(struct director *dir, struct user *user,
+				  bool skip_delay)
 {
 	struct director_user_kill_finish_ctx *ctx;
 
+	timeout_remove(&user->to_move);
+
+	if (skip_delay) {
+		user->kill_state = USER_KILL_STATE_NONE;
+		dir->state_change_callback(dir);
+		return;
+	}
+
 	ctx = i_new(struct director_user_kill_finish_ctx, 1);
 	ctx->dir = dir;
 	ctx->user = user;
 
 	user->kill_state = USER_KILL_STATE_DELAY;
-	timeout_remove(&user->to_move);
 
 	/* wait for a while for the kills to finish in the backend server,
 	   so there are no longer any processes running for the user before we
@@ -741,7 +852,7 @@
 
 	if (dir->right == NULL) {
 		/* we're alone */
-		director_user_kill_finish_delayed(dir, user);
+		director_flush_user(dir, user);
 	} else if (self ||
 		   user->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) {
 		director_connection_send(dir->right, t_strdup_printf(
@@ -993,7 +1104,7 @@
 	    user->kill_state != USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE)
 		return;
 
-	director_user_kill_finish_delayed(dir, user);
+	director_flush_user(dir, user);
 
 	if (orig_src == NULL) {
 		orig_src = dir->self_host;