changeset 10328:b63fd6156663 HEAD

dsync: Added support for subscription syncing.
author Timo Sirainen <tss@iki.fi>
date Fri, 13 Nov 2009 20:55:05 -0500
parents 4267c30ded97
children 32a754b2d79b
files src/dsync/dsync-brain-private.h src/dsync/dsync-brain.c src/dsync/dsync-data.c src/dsync/dsync-data.h src/dsync/dsync-proxy-client.c src/dsync/dsync-proxy-server-cmd.c src/dsync/dsync-proxy-server.h src/dsync/dsync-worker-local.c src/dsync/dsync-worker-private.h src/dsync/dsync-worker.c src/dsync/dsync-worker.h src/dsync/test-dsync-brain.c src/dsync/test-dsync-proxy-server-cmd.c src/dsync/test-dsync-worker.c src/dsync/test-dsync-worker.h
diffstat 15 files changed, 894 insertions(+), 26 deletions(-) [+]
line wrap: on
line diff
--- a/src/dsync/dsync-brain-private.h	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/dsync-brain-private.h	Fri Nov 13 20:55:05 2009 -0500
@@ -6,7 +6,9 @@
 
 enum dsync_state {
 	DSYNC_STATE_GET_MAILBOXES = 0,
+	DSYNC_STATE_GET_SUBSCRIPTIONS,
 	DSYNC_STATE_SYNC_MAILBOXES,
+	DSYNC_STATE_SYNC_SUBSCRIPTIONS,
 	DSYNC_STATE_SYNC_MSGS,
 	DSYNC_STATE_SYNC_MSGS_FLUSH,
 	DSYNC_STATE_SYNC_MSGS_FLUSH2,
@@ -25,6 +27,24 @@
 	ARRAY_TYPE(dsync_mailbox) dirs;
 };
 
+struct dsync_brain_subscription {
+	const char *name;
+	time_t last_change;
+};
+struct dsync_brain_unsubscription {
+	mailbox_guid_t name_sha1;
+	time_t last_change;
+};
+
+struct dsync_brain_subs_list {
+	pool_t pool;
+	struct dsync_brain *brain;
+	struct dsync_worker *worker;
+	struct dsync_worker_subs_iter *iter;
+	ARRAY_DEFINE(subscriptions, struct dsync_brain_subscription);
+	ARRAY_DEFINE(unsubscriptions, struct dsync_brain_unsubscription);
+};
+
 struct dsync_brain_guid_instance {
 	struct dsync_brain_guid_instance *next;
 	uint32_t uid;
@@ -103,6 +123,9 @@
 	struct dsync_brain_mailbox_list *src_mailbox_list;
 	struct dsync_brain_mailbox_list *dest_mailbox_list;
 
+	struct dsync_brain_subs_list *src_subs_list;
+	struct dsync_brain_subs_list *dest_subs_list;
+
 	struct dsync_brain_mailbox_sync *mailbox_sync;
 
 	unsigned int failed:1;
--- a/src/dsync/dsync-brain.c	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/dsync-brain.c	Fri Nov 13 20:55:05 2009 -0500
@@ -9,6 +9,8 @@
 
 static void
 dsync_brain_mailbox_list_deinit(struct dsync_brain_mailbox_list **list);
+static void
+dsync_brain_subs_list_deinit(struct dsync_brain_subs_list **list);
 
 struct dsync_brain *
 dsync_brain_init(struct dsync_worker *src_worker,
@@ -49,11 +51,17 @@
 
 	if (brain->mailbox_sync != NULL)
 		dsync_brain_msg_sync_deinit(&brain->mailbox_sync);
+
 	if (brain->src_mailbox_list != NULL)
 		dsync_brain_mailbox_list_deinit(&brain->src_mailbox_list);
 	if (brain->dest_mailbox_list != NULL)
 		dsync_brain_mailbox_list_deinit(&brain->dest_mailbox_list);
 
+	if (brain->src_subs_list != NULL)
+		dsync_brain_subs_list_deinit(&brain->src_subs_list);
+	if (brain->dest_subs_list != NULL)
+		dsync_brain_subs_list_deinit(&brain->dest_subs_list);
+
 	*_brain = NULL;
 	i_free(brain->mailbox);
 	i_free(brain);
@@ -131,6 +139,96 @@
 	pool_unref(&list->pool);
 }
 
+static void dsync_brain_subs_list_finished(struct dsync_brain *brain)
+{
+	if (brain->src_subs_list->iter != NULL ||
+	    brain->dest_subs_list->iter != NULL)
+		return;
+
+	/* both lists are finished */
+	brain->state++;
+	dsync_brain_sync(brain);
+}
+
+static int
+dsync_brain_subscription_cmp(const struct dsync_brain_subscription *s1,
+			     const struct dsync_brain_subscription *s2)
+{
+	return strcmp(s1->name, s2->name);
+}
+
+static int
+dsync_brain_unsubscription_cmp(const struct dsync_brain_unsubscription *u1,
+			       const struct dsync_brain_unsubscription *u2)
+{
+	return dsync_guid_cmp(&u1->name_sha1, &u2->name_sha1);
+}
+
+static void dsync_worker_subs_input(void *context)
+{
+	struct dsync_brain_subs_list *list = context;
+	struct dsync_brain_subscription subs;
+	struct dsync_brain_unsubscription unsubs;
+	int ret;
+
+	memset(&subs, 0, sizeof(subs));
+	while ((ret = dsync_worker_subs_iter_next(list->iter, &subs.name,
+						  &subs.last_change)) > 0) {
+		subs.name = p_strdup(list->pool, subs.name);
+		array_append(&list->subscriptions, &subs, 1);
+	}
+	if (ret == 0)
+		return;
+
+	memset(&unsubs, 0, sizeof(unsubs));
+	while ((ret = dsync_worker_subs_iter_next_un(list->iter,
+						     &unsubs.name_sha1,
+						     &unsubs.last_change)) > 0)
+		array_append(&list->unsubscriptions, &unsubs, 1);
+
+	if (ret < 0) {
+		/* finished listing subscriptions */
+		if (dsync_worker_subs_iter_deinit(&list->iter) < 0)
+			dsync_brain_fail(list->brain);
+		array_sort(&list->subscriptions,
+			   dsync_brain_subscription_cmp);
+		array_sort(&list->unsubscriptions,
+			   dsync_brain_unsubscription_cmp);
+		dsync_brain_subs_list_finished(list->brain);
+	}
+}
+
+static struct dsync_brain_subs_list *
+dsync_brain_subs_list_init(struct dsync_brain *brain,
+			      struct dsync_worker *worker)
+{
+	struct dsync_brain_subs_list *list;
+	pool_t pool;
+
+	pool = pool_alloconly_create("dsync brain subs list", 1024*4);
+	list = p_new(pool, struct dsync_brain_subs_list, 1);
+	list->pool = pool;
+	list->brain = brain;
+	list->worker = worker;
+	list->iter = dsync_worker_subs_iter_init(worker);
+	p_array_init(&list->subscriptions, pool, 128);
+	p_array_init(&list->unsubscriptions, pool, 64);
+	dsync_worker_set_input_callback(worker, dsync_worker_subs_input, list);
+	return list;
+}
+
+static void
+dsync_brain_subs_list_deinit(struct dsync_brain_subs_list **_list)
+{
+	struct dsync_brain_subs_list *list = *_list;
+
+	*_list = NULL;
+
+	if (list->iter != NULL)
+		(void)dsync_worker_subs_iter_deinit(&list->iter);
+	pool_unref(&list->pool);
+}
+
 static void dsync_brain_sync_mailboxes(struct dsync_brain *brain)
 {
 	struct dsync_mailbox *const *src_boxes, *const *dest_boxes, new_box;
@@ -209,6 +307,77 @@
 	}
 }
 
+static bool
+dsync_brain_is_unsubscribed(struct dsync_brain_subs_list *list,
+			    const struct dsync_brain_subscription *subs)
+{
+	const struct dsync_brain_unsubscription *unsubs;
+	struct dsync_brain_unsubscription lookup;
+
+	/* FIXME: doesn't work with namespace prefixes */
+	dsync_str_sha_to_guid(subs->name, &lookup.name_sha1);
+	unsubs = array_bsearch(&list->unsubscriptions, &lookup,
+			       dsync_brain_unsubscription_cmp);
+	if (unsubs == NULL)
+		return FALSE;
+	else
+		return unsubs->last_change > subs->last_change;
+}
+
+static void dsync_brain_sync_subscriptions(struct dsync_brain *brain)
+{
+	const struct dsync_brain_subscription *src_subs, *dest_subs;
+	unsigned int src, dest, src_count, dest_count;
+	int ret;
+
+	/* subscriptions are sorted by name. */
+	src_subs = array_get(&brain->src_subs_list->subscriptions, &src_count);
+	dest_subs = array_get(&brain->dest_subs_list->subscriptions, &dest_count);
+	for (src = dest = 0;; ) {
+		if (src == src_count) {
+			if (dest == dest_count)
+				break;
+			ret = 1;
+		} else if (dest == dest_count) {
+			ret = -1;
+		} else {
+			ret = strcmp(src_subs[src].name, dest_subs[dest].name);
+			if (ret == 0) {
+				src++; dest++;
+				continue;
+			}
+		}
+
+		if (ret < 0) {
+			/* subscribed only in source */
+			if (dsync_brain_is_unsubscribed(brain->dest_subs_list,
+							&src_subs[src])) {
+				dsync_worker_set_subscribed(brain->src_worker,
+							    src_subs[src].name,
+							    FALSE);
+			} else {
+				dsync_worker_set_subscribed(brain->dest_worker,
+							    src_subs[src].name,
+							    TRUE);
+			}
+			src++;
+		} else {
+			/* subscribed only in dest */
+			if (dsync_brain_is_unsubscribed(brain->src_subs_list,
+							&dest_subs[dest])) {
+				dsync_worker_set_subscribed(brain->dest_worker,
+							    dest_subs[dest].name,
+							    FALSE);
+			} else {
+				dsync_worker_set_subscribed(brain->src_worker,
+							    dest_subs[dest].name,
+							    TRUE);
+			}
+			dest++;
+		}
+	}
+}
+
 static bool dsync_mailbox_has_changed_msgs(struct dsync_brain *brain,
 					   const struct dsync_mailbox *box1,
 					   const struct dsync_mailbox *box2)
@@ -380,10 +549,23 @@
 		dsync_worker_mailbox_input(brain->src_mailbox_list);
 		dsync_worker_mailbox_input(brain->dest_mailbox_list);
 		break;
+	case DSYNC_STATE_GET_SUBSCRIPTIONS:
+		i_assert(brain->src_subs_list == NULL);
+		brain->src_subs_list =
+			dsync_brain_subs_list_init(brain, brain->src_worker);
+		brain->dest_subs_list =
+			dsync_brain_subs_list_init(brain, brain->dest_worker);
+		dsync_worker_subs_input(brain->src_subs_list);
+		dsync_worker_subs_input(brain->dest_subs_list);
+		break;
 	case DSYNC_STATE_SYNC_MAILBOXES:
 		dsync_brain_sync_mailboxes(brain);
 		brain->state++;
 		/* fall through */
+	case DSYNC_STATE_SYNC_SUBSCRIPTIONS:
+		dsync_brain_sync_subscriptions(brain);
+		brain->state++;
+		/* fall through */
 	case DSYNC_STATE_SYNC_MSGS:
 		dsync_brain_sync_msgs(brain);
 		break;
--- a/src/dsync/dsync-data.c	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/dsync-data.c	Fri Nov 13 20:55:05 2009 -0500
@@ -3,6 +3,7 @@
 #include "lib.h"
 #include "buffer.h"
 #include "hex-binary.h"
+#include "sha1.h"
 #include "dsync-data.h"
 
 struct dsync_mailbox *
@@ -88,6 +89,11 @@
 	return memcmp(guid1->guid, guid2->guid, sizeof(guid1->guid)) == 0;
 }
 
+int dsync_guid_cmp(const mailbox_guid_t *guid1, const mailbox_guid_t *guid2)
+{
+	return memcmp(guid1->guid, guid2->guid, sizeof(guid1->guid));
+}
+
 const char *dsync_guid_to_str(const mailbox_guid_t *guid)
 {
 	return binary_to_hex(guid->guid, sizeof(guid->guid));
@@ -108,3 +114,11 @@
 	buffer_append_c(&guid_128_buf, '\0');
 	return guid_128_buf.data;
 }
+
+void dsync_str_sha_to_guid(const char *str, mailbox_guid_t *guid)
+{
+	unsigned char sha[SHA1_RESULTLEN];
+
+	sha1_get_digest(str, strlen(str), sha);
+	memcpy(guid->guid, sha, I_MIN(sizeof(guid->guid), sizeof(sha)));
+}
--- a/src/dsync/dsync-data.h	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/dsync-data.h	Fri Nov 13 20:55:05 2009 -0500
@@ -69,8 +69,10 @@
 
 bool dsync_guid_equals(const mailbox_guid_t *guid1,
 		       const mailbox_guid_t *guid2);
+int dsync_guid_cmp(const mailbox_guid_t *guid1, const mailbox_guid_t *guid2);
 const char *dsync_guid_to_str(const mailbox_guid_t *guid);
 const char *dsync_get_guid_128_str(const char *guid, unsigned char *dest,
 				   unsigned int dest_len);
+void dsync_str_sha_to_guid(const char *str, mailbox_guid_t *guid);
 
 #endif
--- a/src/dsync/dsync-proxy-client.c	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/dsync-proxy-client.c	Fri Nov 13 20:55:05 2009 -0500
@@ -40,6 +40,11 @@
 	pool_t pool;
 };
 
+struct proxy_client_dsync_worker_subs_iter {
+	struct dsync_worker_subs_iter iter;
+	pool_t pool;
+};
+
 struct proxy_client_dsync_worker {
 	struct dsync_worker worker;
 	int fd_in, fd_out;
@@ -397,6 +402,121 @@
 	return ret;
 }
 
+static struct dsync_worker_subs_iter *
+proxy_client_worker_subs_iter_init(struct dsync_worker *_worker)
+{
+	struct proxy_client_dsync_worker *worker =
+		(struct proxy_client_dsync_worker *)_worker;
+	struct proxy_client_dsync_worker_subs_iter *iter;
+
+	iter = i_new(struct proxy_client_dsync_worker_subs_iter, 1);
+	iter->iter.worker = _worker;
+	iter->pool = pool_alloconly_create("proxy subscription iter", 1024);
+	o_stream_send_str(worker->output, "SUBS-LIST\n");
+	proxy_client_worker_output_flush(_worker);
+	return &iter->iter;
+}
+
+static int
+proxy_client_worker_subs_iter_next_line(struct proxy_client_dsync_worker_subs_iter *iter,
+					const char **name_r,
+					time_t *last_change_r)
+{
+	struct proxy_client_dsync_worker *worker =
+		(struct proxy_client_dsync_worker *)iter->iter.worker;
+	const char *line;
+	char **args;
+	int ret;
+
+	if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
+		if (ret < 0)
+			iter->iter.failed = TRUE;
+		return ret;
+	}
+
+	if (*line == '\t') {
+		/* end of subscribed subscriptions */
+		if (line[1] != '0')
+			iter->iter.failed = TRUE;
+		return -1;
+	}
+
+	p_clear(iter->pool);
+	args = p_strsplit(iter->pool, line, "\t");
+	if (args[0] == NULL || args[1] == NULL) {
+		i_error("Invalid subscription input from worker server");
+		iter->iter.failed = TRUE;
+		return -1;
+	}
+	*name_r = args[0];
+	*last_change_r = strtoul(args[1], NULL, 10);
+	return 1;
+}
+
+static int
+proxy_client_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter,
+				   const char **name_r, time_t *last_change_r)
+{
+	struct proxy_client_dsync_worker_subs_iter *iter =
+		(struct proxy_client_dsync_worker_subs_iter *)_iter;
+
+	return proxy_client_worker_subs_iter_next_line(iter, name_r,
+						       last_change_r);
+}
+
+static int
+proxy_client_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter,
+				      mailbox_guid_t *name_sha1_r,
+				      time_t *last_change_r)
+{
+	struct proxy_client_dsync_worker_subs_iter *iter =
+		(struct proxy_client_dsync_worker_subs_iter *)_iter;
+	const char *name;
+	int ret;
+
+	ret = proxy_client_worker_subs_iter_next_line(iter, &name,
+						      last_change_r);
+	if (ret <= 0)
+		return ret;
+
+	if (dsync_proxy_mailbox_guid_import(name, name_sha1_r) < 0) {
+		i_error("Invalid subscription input from worker server: "
+			"Invalid unsubscription mailbox GUID");
+		iter->iter.failed = TRUE;
+		return -1;
+	}
+	return 1;
+}
+
+static int
+proxy_client_worker_subs_iter_deinit(struct dsync_worker_subs_iter *_iter)
+{
+	struct proxy_client_dsync_worker_subs_iter *iter =
+		(struct proxy_client_dsync_worker_subs_iter *)_iter;
+	int ret = _iter->failed ? -1 : 0;
+
+	pool_unref(&iter->pool);
+	i_free(iter);
+	return ret;
+}
+
+static void
+proxy_client_worker_set_subscribed(struct dsync_worker *_worker,
+				   const char *name, bool set)
+{
+	struct proxy_client_dsync_worker *worker =
+		(struct proxy_client_dsync_worker *)_worker;
+
+	T_BEGIN {
+		string_t *str = t_str_new(128);
+
+		str_append(str, "SUBS-SET\t");
+		str_tabescape_write(str, name);
+		str_printfa(str, "\t%d\n", set ? 1 : 0);
+		o_stream_send(worker->output, str_data(str), str_len(str));
+	} T_END;
+}
+
 struct proxy_client_dsync_worker_msg_iter {
 	struct dsync_worker_msg_iter iter;
 	pool_t pool;
@@ -822,6 +942,12 @@
 	proxy_client_worker_mailbox_iter_next,
 	proxy_client_worker_mailbox_iter_deinit,
 
+	proxy_client_worker_subs_iter_init,
+	proxy_client_worker_subs_iter_next,
+	proxy_client_worker_subs_iter_next_un,
+	proxy_client_worker_subs_iter_deinit,
+	proxy_client_worker_set_subscribed,
+
 	proxy_client_worker_msg_iter_init,
 	proxy_client_worker_msg_iter_next,
 	proxy_client_worker_msg_iter_deinit,
--- a/src/dsync/dsync-proxy-server-cmd.c	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/dsync-proxy-server-cmd.c	Fri Nov 13 20:55:05 2009 -0500
@@ -62,6 +62,102 @@
 	}
 }
 
+static bool cmd_subs_list_subscriptions(struct dsync_proxy_server *server)
+{
+	const char *name;
+	time_t last_change;
+	string_t *str;
+	int ret;
+
+	str = t_str_new(256);
+	while ((ret = dsync_worker_subs_iter_next(server->subs_iter,
+						  &name, &last_change)) > 0) {
+		str_truncate(str, 0);
+		str_tabescape_write(str, name);
+		str_printfa(str, "\t%ld\n", (long)last_change);
+		o_stream_send(server->output, str_data(str), str_len(str));
+		if (proxy_server_is_output_full(server))
+			break;
+	}
+	if (ret >= 0) {
+		/* continue later */
+		o_stream_set_flush_pending(server->output, TRUE);
+		return FALSE;
+	}
+	return TRUE;
+}
+
+static bool cmd_subs_list_unsubscriptions(struct dsync_proxy_server *server)
+{
+	mailbox_guid_t name_sha1;
+	time_t last_change;
+	string_t *str;
+	int ret;
+
+	str = t_str_new(256);
+	while ((ret = dsync_worker_subs_iter_next_un(server->subs_iter,
+						     &name_sha1,
+						     &last_change)) > 0) {
+		str_truncate(str, 0);
+		dsync_proxy_mailbox_guid_export(str, &name_sha1);
+		str_printfa(str, "\t%ld\n", (long)last_change);
+		o_stream_send(server->output, str_data(str), str_len(str));
+		if (proxy_server_is_output_full(server))
+			break;
+	}
+	if (ret >= 0) {
+		/* continue later */
+		o_stream_set_flush_pending(server->output, TRUE);
+		return FALSE;
+	}
+	return TRUE;
+}
+
+static int
+cmd_subs_list(struct dsync_proxy_server *server,
+	      const char *const *args ATTR_UNUSED)
+{
+	int ret = 1;
+
+	if (server->subs_iter == NULL) {
+		server->subs_iter =
+			dsync_worker_subs_iter_init(server->worker);
+	}
+
+	if (!server->subs_sending_unsubscriptions) {
+		if (!cmd_subs_list_subscriptions(server))
+			return 0;
+		o_stream_send(server->output, "\t0\n", 3);
+		server->subs_sending_unsubscriptions = TRUE;
+	}
+	if (ret > 0) {
+		if (!cmd_subs_list_unsubscriptions(server))
+			return 0;
+	}
+
+	server->subs_sending_unsubscriptions = FALSE;
+	if (dsync_worker_subs_iter_deinit(&server->subs_iter) < 0) {
+		o_stream_send(server->output, "\t-1\n", 4);
+		return -1;
+	} else {
+		o_stream_send(server->output, "\t0\n", 3);
+		return 1;
+	}
+}
+
+static int
+cmd_subs_set(struct dsync_proxy_server *server, const char *const *args)
+{
+	if (args[0] == NULL || args[1] == NULL) {
+		i_error("subs-set: Missing parameters");
+		return -1;
+	}
+
+	dsync_worker_set_subscribed(server->worker, args[0],
+				    strcmp(args[1], "1") == 0);
+	return 1;
+}
+
 static int
 cmd_msg_list_init(struct dsync_proxy_server *server, const char *const *args)
 {
@@ -412,6 +508,8 @@
 
 static struct dsync_proxy_server_command commands[] = {
 	{ "BOX-LIST", cmd_box_list },
+	{ "SUBS-LIST", cmd_subs_list },
+	{ "SUBS-SET", cmd_subs_set },
 	{ "MSG-LIST", cmd_msg_list },
 	{ "BOX-CREATE", cmd_box_create },
 	{ "BOX-DELETE", cmd_box_delete },
--- a/src/dsync/dsync-proxy-server.h	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/dsync-proxy-server.h	Fri Nov 13 20:55:05 2009 -0500
@@ -22,12 +22,14 @@
 	const char *const *cur_args;
 
 	struct dsync_worker_mailbox_iter *mailbox_iter;
+	struct dsync_worker_subs_iter *subs_iter;
 	struct dsync_worker_msg_iter *msg_iter;
 
 	struct istream *get_input;
 	bool get_input_last_lf;
 	uint32_t get_uid;
 
+	unsigned int subs_sending_unsubscriptions:1;
 	unsigned int finished:1;
 };
 
--- a/src/dsync/dsync-worker-local.c	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/dsync-worker-local.c	Fri Nov 13 20:55:05 2009 -0500
@@ -20,6 +20,12 @@
 	struct hash_iterate_context *deleted_iter;
 };
 
+struct local_dsync_worker_subs_iter {
+	struct dsync_worker_subs_iter iter;
+	struct mailbox_list_iterate_context *list_iter;
+	struct hash_iterate_context *deleted_iter;
+};
+
 struct local_dsync_worker_msg_iter {
 	struct dsync_worker_msg_iter iter;
 	mailbox_guid_t *mailboxes;
@@ -45,6 +51,11 @@
 	time_t last_renamed;
 	unsigned int deleted_mailbox:1;
 	unsigned int deleted_dir:1;
+};
+struct local_dsync_subscription_change {
+	mailbox_guid_t name_sha1;
+	struct mailbox_list *list;
+	time_t last_change;
 	unsigned int unsubscribed:1;
 };
 
@@ -57,6 +68,8 @@
 	struct hash_table *mailbox_hash;
 	/* mailbox_guid_t -> struct local_dsync_mailbox_change* */
 	struct hash_table *mailbox_changes_hash;
+	/* mailbox_guid_t -> struct local_dsync_subscription_change */
+	struct hash_table *subscription_changes_hash;
 
 	mailbox_guid_t selected_box_guid;
 	struct mailbox *selected_box;
@@ -131,6 +144,8 @@
 	hash_table_destroy(&worker->mailbox_hash);
 	if (worker->mailbox_changes_hash != NULL)
 		hash_table_destroy(&worker->mailbox_changes_hash);
+	if (worker->subscription_changes_hash != NULL)
+		hash_table_destroy(&worker->subscription_changes_hash);
 	pool_unref(&worker->pool);
 }
 
@@ -144,6 +159,79 @@
 	return 1;
 }
 
+static void
+dsync_worker_save_mailbox_change(struct local_dsync_worker *worker,
+				 const struct mailbox_log_record *rec)
+{
+	struct local_dsync_mailbox_change *change;
+
+	change = hash_table_lookup(worker->mailbox_changes_hash,
+				   rec->mailbox_guid);
+	if (change == NULL) {
+		change = i_new(struct local_dsync_mailbox_change, 1);
+		memcpy(change->guid.guid, rec->mailbox_guid,
+		       sizeof(change->guid.guid));
+		hash_table_insert(worker->mailbox_changes_hash,
+				  change->guid.guid, change);
+	}
+	switch (rec->type) {
+	case MAILBOX_LOG_RECORD_DELETE_MAILBOX:
+		change->deleted_mailbox = TRUE;
+		break;
+	case MAILBOX_LOG_RECORD_DELETE_DIR:
+		change->deleted_dir = TRUE;
+		break;
+	case MAILBOX_LOG_RECORD_RENAME:
+		change->last_renamed =
+			mailbox_log_record_get_timestamp(rec);
+		break;
+	case MAILBOX_LOG_RECORD_SUBSCRIBE:
+	case MAILBOX_LOG_RECORD_UNSUBSCRIBE:
+		i_unreached();
+	}
+	if (change->deleted_dir && change->deleted_mailbox) {
+		/* same GUID shouldn't be both. something's already
+		   broken, but change this so we don't get into more
+		   problems later. */
+		change->deleted_dir = FALSE;
+	}
+}
+
+static void
+dsync_worker_save_subscription_change(struct local_dsync_worker *worker,
+				      struct mailbox_list *list,
+				      const struct mailbox_log_record *rec)
+{
+	struct local_dsync_subscription_change *change, new_change;
+
+	memset(&new_change, 0, sizeof(new_change));
+	new_change.list = list;
+	memcpy(new_change.name_sha1.guid, rec->mailbox_guid,
+	       sizeof(new_change.name_sha1.guid));
+
+	change = hash_table_lookup(worker->subscription_changes_hash,
+				   &new_change);
+	if (change == NULL) {
+		change = i_new(struct local_dsync_subscription_change, 1);
+		*change = new_change;
+		hash_table_insert(worker->subscription_changes_hash,
+				  change, change);
+	}
+	switch (rec->type) {
+	case MAILBOX_LOG_RECORD_DELETE_MAILBOX:
+	case MAILBOX_LOG_RECORD_DELETE_DIR:
+	case MAILBOX_LOG_RECORD_RENAME:
+		i_unreached();
+	case MAILBOX_LOG_RECORD_SUBSCRIBE:
+		change->unsubscribed = FALSE;
+		break;
+	case MAILBOX_LOG_RECORD_UNSUBSCRIBE:
+		change->unsubscribed = TRUE;
+		break;
+	}
+	change->last_change = mailbox_log_record_get_timestamp(rec);
+}
+
 static int
 dsync_worker_get_list_mailbox_log(struct local_dsync_worker *worker,
 				  struct mailbox_list *list)
@@ -151,44 +239,22 @@
 	struct mailbox_log *log;
 	struct mailbox_log_iter *iter;
 	const struct mailbox_log_record *rec;
-	struct local_dsync_mailbox_change *change;
 
 	log = mailbox_list_get_changelog(list);
 	iter = mailbox_log_iter_init(log);
 	while ((rec = mailbox_log_iter_next(iter)) != NULL) {
-		change = hash_table_lookup(worker->mailbox_changes_hash,
-					   rec->mailbox_guid);
-		if (change == NULL) {
-			change = i_new(struct local_dsync_mailbox_change, 1);
-			memcpy(change->guid.guid, rec->mailbox_guid,
-			       sizeof(change->guid.guid));
-			hash_table_insert(worker->mailbox_changes_hash,
-					  change->guid.guid, change);
-		}
 		switch (rec->type) {
 		case MAILBOX_LOG_RECORD_DELETE_MAILBOX:
-			change->deleted_mailbox = TRUE;
-			break;
 		case MAILBOX_LOG_RECORD_DELETE_DIR:
-			change->deleted_dir = TRUE;
-			break;
 		case MAILBOX_LOG_RECORD_RENAME:
-			change->last_renamed =
-				mailbox_log_record_get_timestamp(rec);
+			dsync_worker_save_mailbox_change(worker, rec);
 			break;
 		case MAILBOX_LOG_RECORD_SUBSCRIBE:
-			change->unsubscribed = FALSE;
-			break;
 		case MAILBOX_LOG_RECORD_UNSUBSCRIBE:
-			change->unsubscribed = TRUE;
+			dsync_worker_save_subscription_change(worker,
+							      list, rec);
 			break;
 		}
-		if (change->deleted_dir && change->deleted_mailbox) {
-			/* same GUID shouldn't be both. something's already
-			   broken, but change this so we don't get into more
-			   problems later. */
-			change->deleted_dir = FALSE;
-		}
 	}
 	return mailbox_log_iter_deinit(&iter);
 }
@@ -208,6 +274,25 @@
 	return memcmp(p1, p2, MAIL_GUID_128_SIZE);
 }
 
+static unsigned int subscription_change_hash(const void *p)
+{
+	const struct local_dsync_subscription_change *change = p;
+
+	return mailbox_log_record_hash(change->name_sha1.guid) ^
+		(unsigned int)change->list;
+}
+
+static int subscription_change_cmp(const void *p1, const void *p2)
+{
+	const struct local_dsync_subscription_change *c1 = p1, *c2 = p2;
+
+	if (c1->list != c2->list)
+		return 1;
+
+	return memcmp(c1->name_sha1.guid, c2->name_sha1.guid,
+		      MAIL_GUID_128_SIZE);
+}
+
 static int dsync_worker_get_mailbox_log(struct local_dsync_worker *worker)
 {
 	struct mail_namespace *ns;
@@ -220,6 +305,10 @@
 		hash_table_create(default_pool, worker->pool, 0,
 				  mailbox_log_record_hash,
 				  mailbox_log_record_cmp);
+	worker->subscription_changes_hash =
+		hash_table_create(default_pool, worker->pool, 0,
+				  subscription_change_hash,
+				  subscription_change_cmp);
 	for (ns = worker->user->namespaces; ns != NULL; ns = ns->next) {
 		if (ns->alias_for != NULL)
 			continue;
@@ -395,6 +484,125 @@
 	return ret;
 }
 
+static struct dsync_worker_subs_iter *
+local_worker_subs_iter_init(struct dsync_worker *_worker)
+{
+	struct local_dsync_worker *worker =
+		(struct local_dsync_worker *)_worker;
+	struct local_dsync_worker_subs_iter *iter;
+	enum mailbox_list_iter_flags list_flags =
+		MAILBOX_LIST_ITER_VIRTUAL_NAMES |
+		MAILBOX_LIST_ITER_SELECT_SUBSCRIBED;
+	static const char *patterns[] = { "*", NULL };
+
+	iter = i_new(struct local_dsync_worker_subs_iter, 1);
+	iter->iter.worker = _worker;
+	iter->list_iter =
+		mailbox_list_iter_init_namespaces(worker->user->namespaces,
+						  patterns, list_flags);
+	(void)dsync_worker_get_mailbox_log(worker);
+	return &iter->iter;
+}
+
+static int
+local_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter,
+			    const char **name_r, time_t *last_change_r)
+{
+	struct local_dsync_worker_subs_iter *iter =
+		(struct local_dsync_worker_subs_iter *)_iter;
+	struct local_dsync_worker *worker =
+		(struct local_dsync_worker *)_iter->worker;
+	struct local_dsync_subscription_change *change, change_lookup;
+	const struct mailbox_info *info;
+	const char *storage_name;
+
+	info = mailbox_list_iter_next(iter->list_iter);
+	if (info == NULL)
+		return -1;
+
+	storage_name = mail_namespace_get_storage_name(info->ns, info->name);
+	dsync_str_sha_to_guid(storage_name, &change_lookup.name_sha1);
+	change_lookup.list = info->ns->list;
+
+	change = hash_table_lookup(worker->subscription_changes_hash,
+				   &change_lookup);
+	if (change != NULL) {
+		/* it shouldn't be marked as unsubscribed, but drop it to
+		   be sure */
+		change->unsubscribed = FALSE;
+		*last_change_r = change->last_change;
+	} else {
+		*last_change_r = 0;
+	}
+	*name_r = info->name;
+	return 1;
+}
+
+static int
+local_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter,
+			       mailbox_guid_t *name_sha1_r,
+			       time_t *last_change_r)
+{
+	struct local_dsync_worker_subs_iter *iter =
+		(struct local_dsync_worker_subs_iter *)_iter;
+	struct local_dsync_worker *worker =
+		(struct local_dsync_worker *)_iter->worker;
+	void *key, *value;
+
+	if (iter->deleted_iter == NULL) {
+		iter->deleted_iter =
+			hash_table_iterate_init(worker->subscription_changes_hash);
+	}
+	while (hash_table_iterate(iter->deleted_iter, &key, &value)) {
+		const struct local_dsync_subscription_change *change = value;
+
+		if (change->unsubscribed) {
+			/* the name doesn't matter */
+			*name_sha1_r = change->name_sha1;
+			*last_change_r = change->last_change;
+			return 1;
+		}
+	}
+	hash_table_iterate_deinit(&iter->deleted_iter);
+	return -1;
+}
+
+static int
+local_worker_subs_iter_deinit(struct dsync_worker_subs_iter *_iter)
+{
+	struct local_dsync_worker_subs_iter *iter =
+		(struct local_dsync_worker_subs_iter *)_iter;
+	int ret = _iter->failed ? -1 : 0;
+
+	if (mailbox_list_iter_deinit(&iter->list_iter) < 0)
+		ret = -1;
+	i_free(iter);
+	return ret;
+}
+
+static void
+local_worker_set_subscribed(struct dsync_worker *_worker,
+			    const char *name, bool set)
+{
+	struct local_dsync_worker *worker =
+		(struct local_dsync_worker *)_worker;
+	struct mail_namespace *ns;
+	const char *storage_name;
+
+	storage_name = name;
+	ns = mail_namespace_find(worker->user->namespaces, &storage_name);
+	if (ns == NULL) {
+		i_error("Can't find namespace for mailbox %s", name);
+		return;
+	}
+
+	if (mailbox_list_set_subscribed(ns->list, storage_name, set) < 0) {
+		dsync_worker_set_failure(_worker);
+		i_error("Can't update subscription %s: %s", name,
+			mailbox_list_get_last_error(ns->list, NULL));
+	}
+}
+
 static int local_mailbox_open(struct local_dsync_worker *worker,
 			      const mailbox_guid_t *guid,
 			      struct mailbox **box_r)
@@ -1151,6 +1359,12 @@
 	local_worker_mailbox_iter_next,
 	local_worker_mailbox_iter_deinit,
 
+	local_worker_subs_iter_init,
+	local_worker_subs_iter_next,
+	local_worker_subs_iter_next_un,
+	local_worker_subs_iter_deinit,
+	local_worker_set_subscribed,
+
 	local_worker_msg_iter_init,
 	local_worker_msg_iter_next,
 	local_worker_msg_iter_deinit,
--- a/src/dsync/dsync-worker-private.h	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/dsync-worker-private.h	Fri Nov 13 20:55:05 2009 -0500
@@ -17,6 +17,17 @@
 				 struct dsync_mailbox *dsync_box_r);
 	int (*mailbox_iter_deinit)(struct dsync_worker_mailbox_iter *iter);
 
+	struct dsync_worker_subs_iter *
+		(*subs_iter_init)(struct dsync_worker *worker);
+	int (*subs_iter_next)(struct dsync_worker_subs_iter *iter,
+			      const char **name_r, time_t *last_change_r);
+	int (*subs_iter_next_un)(struct dsync_worker_subs_iter *iter,
+				 mailbox_guid_t *name_sha1_r,
+				 time_t *last_change_r);
+	int (*subs_iter_deinit)(struct dsync_worker_subs_iter *iter);
+	void (*set_subscribed)(struct dsync_worker *worker,
+			       const char *name, bool set);
+
 	struct dsync_worker_msg_iter *
 		(*msg_iter_init)(struct dsync_worker *worker,
 				 const mailbox_guid_t mailboxes[],
@@ -72,6 +83,11 @@
 	bool failed;
 };
 
+struct dsync_worker_subs_iter {
+	struct dsync_worker *worker;
+	bool failed;
+};
+
 struct dsync_worker_msg_iter {
 	struct dsync_worker *worker;
 	bool failed;
--- a/src/dsync/dsync-worker.c	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/dsync-worker.c	Fri Nov 13 20:55:05 2009 -0500
@@ -61,6 +61,40 @@
 	return iter->worker->v.mailbox_iter_deinit(iter);
 }
 
+struct dsync_worker_subs_iter *
+dsync_worker_subs_iter_init(struct dsync_worker *worker)
+{
+	return worker->v.subs_iter_init(worker);
+}
+
+int dsync_worker_subs_iter_next(struct dsync_worker_subs_iter *iter,
+				const char **name_r, time_t *last_change_r)
+{
+	return iter->worker->v.subs_iter_next(iter, name_r, last_change_r);
+}
+
+int dsync_worker_subs_iter_next_un(struct dsync_worker_subs_iter *iter,
+				   mailbox_guid_t *sha1_name_r,
+				   time_t *last_change_r)
+{
+	return iter->worker->v.subs_iter_next_un(iter, sha1_name_r,
+						 last_change_r);
+}
+
+int dsync_worker_subs_iter_deinit(struct dsync_worker_subs_iter **_iter)
+{
+	struct dsync_worker_subs_iter *iter = *_iter;
+
+	*_iter = NULL;
+	return iter->worker->v.subs_iter_deinit(iter);
+}
+
+void dsync_worker_set_subscribed(struct dsync_worker *worker,
+				 const char *name, bool set)
+{
+	worker->v.set_subscribed(worker, name, set);
+}
+
 struct dsync_worker_msg_iter *
 dsync_worker_msg_iter_init(struct dsync_worker *worker,
 			   const mailbox_guid_t mailboxes[],
--- a/src/dsync/dsync-worker.h	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/dsync-worker.h	Fri Nov 13 20:55:05 2009 -0500
@@ -48,6 +48,23 @@
 /* Finish mailbox iteration. Returns 0 if ok, -1 if iteration failed. */
 int dsync_worker_mailbox_iter_deinit(struct dsync_worker_mailbox_iter **iter);
 
+/* Iterate though all subscriptions */
+struct dsync_worker_subs_iter *
+dsync_worker_subs_iter_init(struct dsync_worker *worker);
+/* Get the next subscription. Returns 1 if ok, 0 if waiting for more data,
+   -1 if there are no more subscriptions. */
+int dsync_worker_subs_iter_next(struct dsync_worker_subs_iter *iter,
+				const char **name_r, time_t *last_change_r);
+/* Like _iter_next(), but list known recent unsubscriptions. */
+int dsync_worker_subs_iter_next_un(struct dsync_worker_subs_iter *iter,
+				   mailbox_guid_t *name_sha1_r,
+				   time_t *last_change_r);
+/* Finish subscription iteration. Returns 0 if ok, -1 if iteration failed. */
+int dsync_worker_subs_iter_deinit(struct dsync_worker_subs_iter **iter);
+/* Subscribe/unsubscribe mailbox */
+void dsync_worker_set_subscribed(struct dsync_worker *worker,
+				 const char *name, bool set);
+
 /* Iterate through all messages in given mailboxes. The mailboxes are iterated
    in the given order. */
 struct dsync_worker_msg_iter *
--- a/src/dsync/test-dsync-brain.c	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/test-dsync-brain.c	Fri Nov 13 20:55:05 2009 -0500
@@ -67,6 +67,13 @@
 	test_worker->worker.input_callback(test_worker->worker.input_context);
 }
 
+static void subscriptions_send_to_worker(struct test_dsync_worker *test_worker)
+{
+	test_worker->subs_iter.last_subs = TRUE;
+	test_worker->subs_iter.last_unsubs = TRUE;
+	test_worker->worker.input_callback(test_worker->worker.input_context);
+}
+
 static bool
 test_dsync_mailbox_create_equals(const struct dsync_mailbox *cbox,
 				 const struct dsync_mailbox *obox)
@@ -160,6 +167,11 @@
 	mailboxes_send_to_worker(src_test_worker, src_boxes);
 	mailboxes_send_to_worker(dest_test_worker, dest_boxes);
 
+	subscriptions_send_to_worker(src_test_worker);
+	subscriptions_send_to_worker(dest_test_worker);
+
+	test_assert(brain->state == DSYNC_STATE_SYNC_MSGS);
+
 	/* check that it created/deleted missing mailboxes */
 	test_assert(test_dsync_worker_next_box_event(dest_test_worker, &box_event));
 	test_assert(box_event.type == LAST_BOX_TYPE_CREATE);
@@ -245,6 +257,11 @@
 	mailboxes_send_to_worker(src_test_worker, boxes);
 	mailboxes_send_to_worker(dest_test_worker, boxes);
 
+	subscriptions_send_to_worker(src_test_worker);
+	subscriptions_send_to_worker(dest_test_worker);
+
+	test_assert(brain->state == DSYNC_STATE_SYNC_MSGS);
+
 	test_assert(!test_dsync_worker_next_box_event(src_test_worker, &box_event));
 	test_assert(!test_dsync_worker_next_box_event(dest_test_worker, &box_event));
 
--- a/src/dsync/test-dsync-proxy-server-cmd.c	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/test-dsync-proxy-server-cmd.c	Fri Nov 13 20:55:05 2009 -0500
@@ -112,6 +112,44 @@
 	test_end();
 }
 
+static void test_dsync_proxy_subs_list(void)
+{
+	const char *name;
+	mailbox_guid_t name_sha1;
+
+	test_begin("proxy server subs list");
+
+	test_assert(run_cmd("SUBS-LIST", NULL) == 0);
+
+	/* subscription */
+	name = "\t\001\r\nname\t\001\n\r";
+	test_worker->subs_iter.next_name = name;
+	test_worker->subs_iter.next_last_change = 1234567890;
+	test_assert(run_more() == 0);
+	test_assert(strcmp(str_c(out), t_strconcat(
+		str_tabescape(name), "\t1234567890\n", NULL)) == 0);
+	out_clear();
+
+	test_worker->subs_iter.last_subs = TRUE;
+	test_assert(run_more() == 0);
+	test_assert(strcmp(str_c(out), "\t0\n") == 0);
+	out_clear();
+
+	/* unsubscription */
+	memcpy(name_sha1.guid, test_mailbox_guid1, sizeof(name_sha1.guid));
+	test_worker->subs_iter.next_unsubscription = &name_sha1;
+	test_assert(run_more() == 0);
+	test_assert(strcmp(str_c(out), TEST_MAILBOX_GUID1"\t1234567890\n") == 0);
+	out_clear();
+
+	test_worker->subs_iter.last_unsubs = TRUE;
+	test_assert(run_more() == 1);
+	test_assert(strcmp(str_c(out), "\t0\n") == 0);
+	out_clear();
+
+	test_end();
+}
+
 static void test_dsync_proxy_msg_list(void)
 {
 	static const char *test_keywords[] = {
@@ -399,6 +437,7 @@
 {
 	static void (*test_functions[])(void) = {
 		test_dsync_proxy_box_list,
+		test_dsync_proxy_subs_list,
 		test_dsync_proxy_msg_list,
 		test_dsync_proxy_box_create,
 		test_dsync_proxy_box_delete,
--- a/src/dsync/test-dsync-worker.c	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/test-dsync-worker.c	Fri Nov 13 20:55:05 2009 -0500
@@ -80,6 +80,60 @@
 	return 0;
 }
 
+static struct dsync_worker_subs_iter *
+test_worker_subs_iter_init(struct dsync_worker *_worker)
+{
+	struct test_dsync_worker *worker = (struct test_dsync_worker *)_worker;
+
+	i_assert(worker->subs_iter.iter.worker == NULL);
+
+	worker->subs_iter.iter.worker = _worker;
+	return &worker->subs_iter.iter;
+}
+
+static int
+test_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter,
+			   const char **name_r, time_t *last_change_r)
+{
+	struct test_dsync_worker_subs_iter *iter =
+		(struct test_dsync_worker_subs_iter *)_iter;
+
+	if (iter->next_name == NULL)
+		return iter->last_subs ? -1 : 0;
+
+	*name_r = iter->next_name;
+	*last_change_r = iter->next_last_change;
+	iter->next_name = NULL;
+	return 1;
+}
+
+static int
+test_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter,
+			      mailbox_guid_t *name_sha1_r,
+			      time_t *last_change_r)
+{
+	struct test_dsync_worker_subs_iter *iter =
+		(struct test_dsync_worker_subs_iter *)_iter;
+
+	if (iter->next_unsubscription == NULL)
+		return iter->last_unsubs ? -1 : 0;
+
+	*name_sha1_r = *iter->next_unsubscription;
+	*last_change_r = iter->next_last_change;
+	iter->next_unsubscription = NULL;
+	return 1;
+}
+
+static int
+test_worker_subs_iter_deinit(struct dsync_worker_subs_iter *iter)
+{
+	struct test_dsync_worker *worker =
+		(struct test_dsync_worker *)iter->worker;
+
+	memset(&worker->subs_iter, 0, sizeof(worker->subs_iter));
+	return 0;
+}
+
 static struct dsync_worker_msg_iter *
 test_worker_msg_iter_init(struct dsync_worker *_worker,
 			  const mailbox_guid_t mailboxes[],
@@ -160,6 +214,19 @@
 }
 
 static void
+test_worker_set_subscribed(struct dsync_worker *_worker,
+			   const char *name, bool set)
+{
+	struct dsync_mailbox dsync_box;
+
+	memset(&dsync_box, 0, sizeof(dsync_box));
+	dsync_box.name = name;
+	test_worker_set_last_box(_worker, &dsync_box,
+				 set ? LAST_BOX_TYPE_SUBSCRIBE :
+				 LAST_BOX_TYPE_UNSUBSCRIBE);
+}
+
+static void
 test_worker_create_mailbox(struct dsync_worker *_worker,
 			   const struct dsync_mailbox *dsync_box)
 {
@@ -368,6 +435,12 @@
 	test_worker_mailbox_iter_next,
 	test_worker_mailbox_iter_deinit,
 
+	test_worker_subs_iter_init,
+	test_worker_subs_iter_next,
+	test_worker_subs_iter_next_un,
+	test_worker_subs_iter_deinit,
+	test_worker_set_subscribed,
+
 	test_worker_msg_iter_init,
 	test_worker_msg_iter_next,
 	test_worker_msg_iter_deinit,
--- a/src/dsync/test-dsync-worker.h	Fri Nov 13 20:54:50 2009 -0500
+++ b/src/dsync/test-dsync-worker.h	Fri Nov 13 20:55:05 2009 -0500
@@ -7,7 +7,9 @@
 	LAST_BOX_TYPE_CREATE,
 	LAST_BOX_TYPE_DELETE,
 	LAST_BOX_TYPE_RENAME,
-	LAST_BOX_TYPE_UPDATE
+	LAST_BOX_TYPE_UPDATE,
+	LAST_BOX_TYPE_SUBSCRIBE,
+	LAST_BOX_TYPE_UNSUBSCRIBE
 };
 
 enum test_dsync_last_msg_type {
@@ -24,6 +26,14 @@
 	bool last;
 };
 
+struct test_dsync_worker_subs_iter {
+	struct dsync_worker_subs_iter iter;
+	const char *next_name;
+	mailbox_guid_t *next_unsubscription;
+	time_t next_last_change;
+	bool last_subs, last_unsubs;
+};
+
 struct test_dsync_worker_msg {
 	struct dsync_message msg;
 	unsigned int mailbox_idx;
@@ -61,6 +71,7 @@
 	struct istream *body_stream;
 
 	struct test_dsync_worker_mailbox_iter box_iter;
+	struct test_dsync_worker_subs_iter subs_iter;
 	struct test_dsync_worker_msg_iter msg_iter;
 	ARRAY_DEFINE(results, struct test_dsync_worker_result);