changeset 17217:0ec7e1e1db48

replicator: Added "doveadm replicator dsync-status" command.
author Timo Sirainen <tss@iki.fi>
date Tue, 15 Apr 2014 18:24:31 +0200
parents 3f3d4c64d7b4
children 2ef62e1f44f2
files src/doveadm/doveadm-replicator.c src/replication/replicator/doveadm-connection.c src/replication/replicator/doveadm-connection.h src/replication/replicator/dsync-client.c src/replication/replicator/dsync-client.h src/replication/replicator/replicator-brain.c src/replication/replicator/replicator-brain.h src/replication/replicator/replicator.c
diffstat 8 files changed, 178 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/src/doveadm/doveadm-replicator.c	Tue Apr 15 17:42:31 2014 +0200
+++ b/src/doveadm/doveadm-replicator.c	Tue Apr 15 18:24:31 2014 +0200
@@ -177,6 +177,39 @@
 	replicator_disconnect(ctx);
 }
 
+static void cmd_replicator_dsync_status(int argc, char *argv[])
+{
+	struct replicator_context *ctx;
+	const char *line;
+	unsigned int i;
+
+	ctx = cmd_replicator_init(argc, argv, "a:", cmd_replicator_dsync_status);
+
+	doveadm_print_init(DOVEADM_PRINT_TYPE_TABLE);
+	doveadm_print_header("username", "username",
+			     DOVEADM_PRINT_HEADER_FLAG_EXPAND);
+	doveadm_print_header_simple("type");
+	doveadm_print_header_simple("status");
+
+	replicator_send(ctx, "STATUS-DSYNC\n");
+	while ((line = i_stream_read_next_line(ctx->input)) != NULL) {
+		if (*line == '\0')
+			break;
+		T_BEGIN {
+			const char *const *args = t_strsplit_tab(line);
+
+			for (i = 0; i < 3; i++) {
+				if (args[i] == NULL)
+					break;
+				doveadm_print(args[i]);
+			}
+			for (; i < 3; i++)
+				doveadm_print("");
+		} T_END;
+	}
+	replicator_disconnect(ctx);
+}
+
 static void cmd_replicator_replicate(int argc, char *argv[])
 {
 	struct replicator_context *ctx;
@@ -247,6 +280,8 @@
 struct doveadm_cmd doveadm_cmd_replicator[] = {
 	{ cmd_replicator_status, "replicator status",
 	  "[-a <replicator socket path>] [<user mask>]" },
+	{ cmd_replicator_dsync_status, "replicator dsync-status",
+	  "[-a <replicator socket path>]" },
 	{ cmd_replicator_replicate, "replicator replicate",
 	  "[-a <replicator socket path>] [-p <priority>] <user mask>" },
 	{ cmd_replicator_remove, "replicator remove",
--- a/src/replication/replicator/doveadm-connection.c	Tue Apr 15 17:42:31 2014 +0200
+++ b/src/replication/replicator/doveadm-connection.c	Tue Apr 15 18:24:31 2014 +0200
@@ -1,13 +1,16 @@
 /* Copyright (c) 2013-2014 Dovecot authors, see the included COPYING file */
 
 #include "lib.h"
+#include "array.h"
 #include "connection.h"
 #include "ostream.h"
 #include "str.h"
 #include "strescape.h"
 #include "wildcard-match.h"
 #include "master-service.h"
+#include "replicator-brain.h"
 #include "replicator-queue.h"
+#include "dsync-client.h"
 #include "doveadm-connection.h"
 
 #include <unistd.h>
@@ -17,12 +20,14 @@
 
 struct doveadm_connection {
 	struct connection conn;
-	struct replicator_queue *queue;
+	struct replicator_brain *brain;
 };
 static struct connection_list *doveadm_connections;
 
 static int client_input_status_overview(struct doveadm_connection *client)
 {
+	struct replicator_queue *queue =
+		replicator_brain_get_queue(client->brain);
 	struct replicator_queue_iter *iter;
 	struct replicator_user *user;
 	enum replication_priority priority;
@@ -36,12 +41,11 @@
 	pending_full_resync_count = 0;
 
 	user_count = 0;
-	iter = replicator_queue_iter_init(client->queue);
+	iter = replicator_queue_iter_init(queue);
 	while ((user = replicator_queue_iter_next(iter)) != NULL) {
 		if (user->priority != REPLICATION_PRIORITY_NONE)
 			pending_counts[user->priority]++;
-		else if (replicator_queue_want_sync_now(client->queue,
-							user, &next_secs)) {
+		else if (replicator_queue_want_sync_now(queue, user, &next_secs)) {
 			if (user->last_sync_failed)
 				pending_failed_count++;
 			else
@@ -74,6 +78,8 @@
 static int
 client_input_status(struct doveadm_connection *client, const char *const *args)
 {
+	struct replicator_queue *queue =
+		replicator_brain_get_queue(client->brain);
 	struct replicator_queue_iter *iter;
 	struct replicator_user *user;
 	const char *mask = args[0];
@@ -82,7 +88,7 @@
 	if (mask == NULL)
 		return client_input_status_overview(client);
 
-	iter = replicator_queue_iter_init(client->queue);
+	iter = replicator_queue_iter_init(queue);
 	while ((user = replicator_queue_iter_next(iter)) != NULL) {
 		if (!wildcard_match(user->username, mask))
 			continue;
@@ -103,8 +109,42 @@
 }
 
 static int
+client_input_status_dsyncs(struct doveadm_connection *client)
+{
+	string_t *str = t_str_new(256);
+	const ARRAY_TYPE(dsync_client) *clients;
+	struct dsync_client *const *clientp;
+
+	clients = replicator_brain_get_dsync_clients(client->brain);
+	array_foreach(clients, clientp) {
+		str_append_tabescaped(str, dsync_client_get_username(*clientp));
+		str_append_c(str, '\t');
+		switch (dsync_client_get_type(*clientp)) {
+		case DSYNC_TYPE_FULL:
+			str_append(str, "full");
+			break;
+		case DSYNC_TYPE_NORMAL:
+			str_append(str, "normal");
+			break;
+		case DSYNC_TYPE_INCREMENTAL:
+			str_append(str, "incremental");
+			break;
+		}
+		str_append_c(str, '\t');
+		str_append_tabescaped(str, dsync_client_get_state(*clientp));
+		str_append_c(str, '\n');
+	}
+
+	str_append_c(str, '\n');
+	o_stream_send(client->conn.output, str_data(str), str_len(str));
+	return 0;
+}
+
+static int
 client_input_replicate(struct doveadm_connection *client, const char *const *args)
 {
+	struct replicator_queue *queue =
+		replicator_brain_get_queue(client->brain);
 	struct replicator_queue_iter *iter;
 	struct replicator_user *user;
 	const char *usermask;
@@ -122,17 +162,17 @@
 	}
 	usermask = args[1];
 	if (strchr(usermask, '*') == NULL && strchr(usermask, '?') == NULL) {
-		replicator_queue_add(client->queue, usermask, priority);
+		replicator_queue_add(queue, usermask, priority);
 		o_stream_send_str(client->conn.output, "+1\n");
 		return 0;
 	}
 
 	match_count = 0;
-	iter = replicator_queue_iter_init(client->queue);
+	iter = replicator_queue_iter_init(queue);
 	while ((user = replicator_queue_iter_next(iter)) != NULL) {
 		if (!wildcard_match(user->username, usermask))
 			continue;
-		replicator_queue_add(client->queue, user->username, priority);
+		replicator_queue_add(queue, user->username, priority);
 		match_count++;
 	}
 	replicator_queue_iter_deinit(&iter);
@@ -144,6 +184,8 @@
 static int
 client_input_remove(struct doveadm_connection *client, const char *const *args)
 {
+	struct replicator_queue *queue =
+		replicator_brain_get_queue(client->brain);
 	struct replicator_user *user;
 
 	/* <username> */
@@ -151,11 +193,11 @@
 		i_error("%s: REMOVE: Invalid parameters", client->conn.name);
 		return -1;
 	}
-	user = replicator_queue_lookup(client->queue, args[0]);
+	user = replicator_queue_lookup(queue, args[0]);
 	if (user == NULL)
 		o_stream_send_str(client->conn.output, "-User not found\n");
 	else {
-		replicator_queue_remove(client->queue, &user);
+		replicator_queue_remove(queue, &user);
 		o_stream_send_str(client->conn.output, "+\n");
 	}
 	return 0;
@@ -164,6 +206,8 @@
 static int
 client_input_notify(struct doveadm_connection *client, const char *const *args)
 {
+	struct replicator_queue *queue =
+		replicator_brain_get_queue(client->brain);
 	struct replicator_user *user;
 
 	/* <username> <flags> <state> */
@@ -172,8 +216,7 @@
 		return -1;
 	}
 
-	user = replicator_queue_add(client->queue, args[0],
-				    REPLICATION_PRIORITY_NONE);
+	user = replicator_queue_add(queue, args[0], REPLICATION_PRIORITY_NONE);
 	if (args[1][0] == 'f')
 		user->last_full_sync = ioloop_time;
 	user->last_fast_sync = ioloop_time;
@@ -200,6 +243,8 @@
 
 	if (strcmp(cmd, "STATUS") == 0)
 		return client_input_status(client, args);
+	else if (strcmp(cmd, "STATUS-DSYNC") == 0)
+		return client_input_status_dsyncs(client);
 	else if (strcmp(cmd, "REPLICATE") == 0)
 		return client_input_replicate(client, args);
 	else if (strcmp(cmd, "REMOVE") == 0)
@@ -220,12 +265,12 @@
 	master_service_client_connection_destroyed(master_service);
 }
 
-void doveadm_connection_create(struct replicator_queue *queue, int fd)
+void doveadm_connection_create(struct replicator_brain *brain, int fd)
 {
 	struct doveadm_connection *client;
 
 	client = i_new(struct doveadm_connection, 1);
-	client->queue = queue;
+	client->brain = brain;
 	connection_init_server(doveadm_connections, &client->conn,
 			       "(doveadm client)", fd, fd);
 }
--- a/src/replication/replicator/doveadm-connection.h	Tue Apr 15 17:42:31 2014 +0200
+++ b/src/replication/replicator/doveadm-connection.h	Tue Apr 15 18:24:31 2014 +0200
@@ -1,7 +1,9 @@
 #ifndef DOVEADM_CONNECTION_H
 #define DOVEADM_CONNECTION_H
 
-void doveadm_connection_create(struct replicator_queue *queue, int fd);
+struct replicator_brain;
+
+void doveadm_connection_create(struct replicator_brain *brain, int fd);
 
 void doveadm_connections_init(void);
 void doveadm_connections_deinit(void);
--- a/src/replication/replicator/dsync-client.c	Tue Apr 15 17:42:31 2014 +0200
+++ b/src/replication/replicator/dsync-client.c	Tue Apr 15 18:24:31 2014 +0200
@@ -23,7 +23,9 @@
 	struct timeout *to;
 
 	char *dsync_params;
+	char *username;
 	char *state;
+	enum dsync_type sync_type;
 	dsync_callback_t *callback;
 	void *context;
 
@@ -69,6 +71,7 @@
 	client->cmd_sent = FALSE;
 	client->handshaked = FALSE;
 	i_free_and_null(client->state);
+	i_free_and_null(client->username);
 
 	if (client->fd == -1)
 		return;
@@ -195,9 +198,16 @@
 	i_assert(callback != NULL);
 	i_assert(!dsync_client_is_busy(client));
 
+	client->username = i_strdup(username);
 	client->cmd_sent = TRUE;
 	client->callback = callback;
 	client->context = context;
+	if (full)
+		client->sync_type = DSYNC_TYPE_FULL;
+	else if (state != NULL && state[0] != '\0')
+		client->sync_type = DSYNC_TYPE_INCREMENTAL;
+	else
+		client->sync_type = DSYNC_TYPE_NORMAL;
 
 	if (dsync_connect(client) < 0) {
 		i_assert(client->to == NULL);
@@ -233,3 +243,31 @@
 {
 	return client->cmd_sent;
 }
+
+const char *dsync_client_get_username(struct dsync_client *conn)
+{
+	return conn->username;
+}
+
+enum dsync_type dsync_client_get_type(struct dsync_client *conn)
+{
+	return conn->sync_type;
+}
+
+const char *dsync_client_get_state(struct dsync_client *conn)
+{
+	if (conn->fd == -1) {
+		if (conn->last_connect_failure == 0)
+			return "Not connected";
+		return t_strdup_printf("Failed to connect to '%s' - last attempt %ld secs ago", conn->path,
+				       (long)(ioloop_time - conn->last_connect_failure));
+	}
+	if (!dsync_client_is_busy(conn))
+		return "Idle";
+	if (!conn->handshaked)
+		return "Waiting for handshake";
+	if (conn->state == NULL)
+		return "Waiting for dsync to finish";
+	else
+		return "Waiting for dsync to finish (second line)";
+}
--- a/src/replication/replicator/dsync-client.h	Tue Apr 15 17:42:31 2014 +0200
+++ b/src/replication/replicator/dsync-client.h	Tue Apr 15 18:24:31 2014 +0200
@@ -1,12 +1,22 @@
 #ifndef DSYNC_CLIENT_H
 #define DSYNC_CLIENT_H
 
+struct dsync_client;
+
 enum dsync_reply {
 	DSYNC_REPLY_OK,
 	DSYNC_REPLY_FAIL,
 	DSYNC_REPLY_NOUSER
 };
 
+enum dsync_type {
+	DSYNC_TYPE_FULL,
+	DSYNC_TYPE_NORMAL,
+	DSYNC_TYPE_INCREMENTAL
+};
+
+ARRAY_DEFINE_TYPE(dsync_client, struct dsync_client *);
+
 typedef void dsync_callback_t(enum dsync_reply reply,
 			      const char *state, void *context);
 
@@ -19,4 +29,8 @@
 		       dsync_callback_t *callback, void *context);
 bool dsync_client_is_busy(struct dsync_client *conn);
 
+const char *dsync_client_get_username(struct dsync_client *conn);
+enum dsync_type dsync_client_get_type(struct dsync_client *conn);
+const char *dsync_client_get_state(struct dsync_client *conn);
+
 #endif
--- a/src/replication/replicator/replicator-brain.c	Tue Apr 15 17:42:31 2014 +0200
+++ b/src/replication/replicator/replicator-brain.c	Tue Apr 15 18:24:31 2014 +0200
@@ -19,7 +19,7 @@
 	const struct replicator_settings *set;
 	struct timeout *to;
 
-	ARRAY(struct dsync_client *) dsync_clients;
+	ARRAY_TYPE(dsync_client) dsync_clients;
 
 	unsigned int deinitializing:1;
 };
@@ -67,6 +67,24 @@
 	pool_unref(&brain->pool);
 }
 
+struct replicator_queue *
+replicator_brain_get_queue(struct replicator_brain *brain)
+{
+	return brain->queue;
+}
+
+const struct replicator_settings *
+replicator_brain_get_settings(struct replicator_brain *brain)
+{
+	return brain->set;
+}
+
+const ARRAY_TYPE(dsync_client) *
+replicator_brain_get_dsync_clients(struct replicator_brain *brain)
+{
+	return &brain->dsync_clients;
+}
+
 static struct dsync_client *
 get_dsync_client(struct replicator_brain *brain)
 {
--- a/src/replication/replicator/replicator-brain.h	Tue Apr 15 17:42:31 2014 +0200
+++ b/src/replication/replicator/replicator-brain.h	Tue Apr 15 18:24:31 2014 +0200
@@ -2,10 +2,19 @@
 #define REPLICATOR_BRAIN_H
 
 struct replicator_settings;
+struct replicator_queue;
 
 struct replicator_brain *
 replicator_brain_init(struct replicator_queue *queue,
 		      const struct replicator_settings *set);
 void replicator_brain_deinit(struct replicator_brain **brain);
 
+struct replicator_queue *
+replicator_brain_get_queue(struct replicator_brain *brain);
+const struct replicator_settings *
+replicator_brain_get_settings(struct replicator_brain *brain);
+
+const ARRAY_TYPE(dsync_client) *
+replicator_brain_get_dsync_clients(struct replicator_brain *brain);
+
 #endif
--- a/src/replication/replicator/replicator.c	Tue Apr 15 17:42:31 2014 +0200
+++ b/src/replication/replicator/replicator.c	Tue Apr 15 18:24:31 2014 +0200
@@ -28,7 +28,7 @@
 {
 	master_service_client_connection_accept(conn);
 	if (strcmp(conn->name, "replicator-doveadm") == 0)
-		doveadm_connection_create(queue, conn->fd);
+		doveadm_connection_create(brain, conn->fd);
 	else
 		(void)notify_connection_create(conn->fd, queue);
 }