Mercurial > dovecot > core-2.2
changeset 10328:b63fd6156663 HEAD
dsync: Added support for subscription syncing.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Fri, 13 Nov 2009 20:55:05 -0500 |
parents | 4267c30ded97 |
children | 32a754b2d79b |
files | src/dsync/dsync-brain-private.h src/dsync/dsync-brain.c src/dsync/dsync-data.c src/dsync/dsync-data.h src/dsync/dsync-proxy-client.c src/dsync/dsync-proxy-server-cmd.c src/dsync/dsync-proxy-server.h src/dsync/dsync-worker-local.c src/dsync/dsync-worker-private.h src/dsync/dsync-worker.c src/dsync/dsync-worker.h src/dsync/test-dsync-brain.c src/dsync/test-dsync-proxy-server-cmd.c src/dsync/test-dsync-worker.c src/dsync/test-dsync-worker.h |
diffstat | 15 files changed, 894 insertions(+), 26 deletions(-) [+] |
line wrap: on
line diff
--- a/src/dsync/dsync-brain-private.h Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/dsync-brain-private.h Fri Nov 13 20:55:05 2009 -0500 @@ -6,7 +6,9 @@ enum dsync_state { DSYNC_STATE_GET_MAILBOXES = 0, + DSYNC_STATE_GET_SUBSCRIPTIONS, DSYNC_STATE_SYNC_MAILBOXES, + DSYNC_STATE_SYNC_SUBSCRIPTIONS, DSYNC_STATE_SYNC_MSGS, DSYNC_STATE_SYNC_MSGS_FLUSH, DSYNC_STATE_SYNC_MSGS_FLUSH2, @@ -25,6 +27,24 @@ ARRAY_TYPE(dsync_mailbox) dirs; }; +struct dsync_brain_subscription { + const char *name; + time_t last_change; +}; +struct dsync_brain_unsubscription { + mailbox_guid_t name_sha1; + time_t last_change; +}; + +struct dsync_brain_subs_list { + pool_t pool; + struct dsync_brain *brain; + struct dsync_worker *worker; + struct dsync_worker_subs_iter *iter; + ARRAY_DEFINE(subscriptions, struct dsync_brain_subscription); + ARRAY_DEFINE(unsubscriptions, struct dsync_brain_unsubscription); +}; + struct dsync_brain_guid_instance { struct dsync_brain_guid_instance *next; uint32_t uid; @@ -103,6 +123,9 @@ struct dsync_brain_mailbox_list *src_mailbox_list; struct dsync_brain_mailbox_list *dest_mailbox_list; + struct dsync_brain_subs_list *src_subs_list; + struct dsync_brain_subs_list *dest_subs_list; + struct dsync_brain_mailbox_sync *mailbox_sync; unsigned int failed:1;
--- a/src/dsync/dsync-brain.c Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/dsync-brain.c Fri Nov 13 20:55:05 2009 -0500 @@ -9,6 +9,8 @@ static void dsync_brain_mailbox_list_deinit(struct dsync_brain_mailbox_list **list); +static void +dsync_brain_subs_list_deinit(struct dsync_brain_subs_list **list); struct dsync_brain * dsync_brain_init(struct dsync_worker *src_worker, @@ -49,11 +51,17 @@ if (brain->mailbox_sync != NULL) dsync_brain_msg_sync_deinit(&brain->mailbox_sync); + if (brain->src_mailbox_list != NULL) dsync_brain_mailbox_list_deinit(&brain->src_mailbox_list); if (brain->dest_mailbox_list != NULL) dsync_brain_mailbox_list_deinit(&brain->dest_mailbox_list); + if (brain->src_subs_list != NULL) + dsync_brain_subs_list_deinit(&brain->src_subs_list); + if (brain->dest_subs_list != NULL) + dsync_brain_subs_list_deinit(&brain->dest_subs_list); + *_brain = NULL; i_free(brain->mailbox); i_free(brain); @@ -131,6 +139,96 @@ pool_unref(&list->pool); } +static void dsync_brain_subs_list_finished(struct dsync_brain *brain) +{ + if (brain->src_subs_list->iter != NULL || + brain->dest_subs_list->iter != NULL) + return; + + /* both lists are finished */ + brain->state++; + dsync_brain_sync(brain); +} + +static int +dsync_brain_subscription_cmp(const struct dsync_brain_subscription *s1, + const struct dsync_brain_subscription *s2) +{ + return strcmp(s1->name, s2->name); +} + +static int +dsync_brain_unsubscription_cmp(const struct dsync_brain_unsubscription *u1, + const struct dsync_brain_unsubscription *u2) +{ + return dsync_guid_cmp(&u1->name_sha1, &u2->name_sha1); +} + +static void dsync_worker_subs_input(void *context) +{ + struct dsync_brain_subs_list *list = context; + struct dsync_brain_subscription subs; + struct dsync_brain_unsubscription unsubs; + int ret; + + memset(&subs, 0, sizeof(subs)); + while ((ret = dsync_worker_subs_iter_next(list->iter, &subs.name, + &subs.last_change)) > 0) { + subs.name = p_strdup(list->pool, subs.name); + array_append(&list->subscriptions, &subs, 1); + } + if (ret == 0) + return; + + memset(&unsubs, 0, sizeof(unsubs)); + while ((ret = dsync_worker_subs_iter_next_un(list->iter, + &unsubs.name_sha1, + &unsubs.last_change)) > 0) + array_append(&list->unsubscriptions, &unsubs, 1); + + if (ret < 0) { + /* finished listing subscriptions */ + if (dsync_worker_subs_iter_deinit(&list->iter) < 0) + dsync_brain_fail(list->brain); + array_sort(&list->subscriptions, + dsync_brain_subscription_cmp); + array_sort(&list->unsubscriptions, + dsync_brain_unsubscription_cmp); + dsync_brain_subs_list_finished(list->brain); + } +} + +static struct dsync_brain_subs_list * +dsync_brain_subs_list_init(struct dsync_brain *brain, + struct dsync_worker *worker) +{ + struct dsync_brain_subs_list *list; + pool_t pool; + + pool = pool_alloconly_create("dsync brain subs list", 1024*4); + list = p_new(pool, struct dsync_brain_subs_list, 1); + list->pool = pool; + list->brain = brain; + list->worker = worker; + list->iter = dsync_worker_subs_iter_init(worker); + p_array_init(&list->subscriptions, pool, 128); + p_array_init(&list->unsubscriptions, pool, 64); + dsync_worker_set_input_callback(worker, dsync_worker_subs_input, list); + return list; +} + +static void +dsync_brain_subs_list_deinit(struct dsync_brain_subs_list **_list) +{ + struct dsync_brain_subs_list *list = *_list; + + *_list = NULL; + + if (list->iter != NULL) + (void)dsync_worker_subs_iter_deinit(&list->iter); + pool_unref(&list->pool); +} + static void dsync_brain_sync_mailboxes(struct dsync_brain *brain) { struct dsync_mailbox *const *src_boxes, *const *dest_boxes, new_box; @@ -209,6 +307,77 @@ } } +static bool +dsync_brain_is_unsubscribed(struct dsync_brain_subs_list *list, + const struct dsync_brain_subscription *subs) +{ + const struct dsync_brain_unsubscription *unsubs; + struct dsync_brain_unsubscription lookup; + + /* FIXME: doesn't work with namespace prefixes */ + dsync_str_sha_to_guid(subs->name, &lookup.name_sha1); + unsubs = array_bsearch(&list->unsubscriptions, &lookup, + dsync_brain_unsubscription_cmp); + if (unsubs == NULL) + return FALSE; + else + return unsubs->last_change > subs->last_change; +} + +static void dsync_brain_sync_subscriptions(struct dsync_brain *brain) +{ + const struct dsync_brain_subscription *src_subs, *dest_subs; + unsigned int src, dest, src_count, dest_count; + int ret; + + /* subscriptions are sorted by name. */ + src_subs = array_get(&brain->src_subs_list->subscriptions, &src_count); + dest_subs = array_get(&brain->dest_subs_list->subscriptions, &dest_count); + for (src = dest = 0;; ) { + if (src == src_count) { + if (dest == dest_count) + break; + ret = 1; + } else if (dest == dest_count) { + ret = -1; + } else { + ret = strcmp(src_subs[src].name, dest_subs[dest].name); + if (ret == 0) { + src++; dest++; + continue; + } + } + + if (ret < 0) { + /* subscribed only in source */ + if (dsync_brain_is_unsubscribed(brain->dest_subs_list, + &src_subs[src])) { + dsync_worker_set_subscribed(brain->src_worker, + src_subs[src].name, + FALSE); + } else { + dsync_worker_set_subscribed(brain->dest_worker, + src_subs[src].name, + TRUE); + } + src++; + } else { + /* subscribed only in dest */ + if (dsync_brain_is_unsubscribed(brain->src_subs_list, + &dest_subs[dest])) { + dsync_worker_set_subscribed(brain->dest_worker, + dest_subs[dest].name, + FALSE); + } else { + dsync_worker_set_subscribed(brain->src_worker, + dest_subs[dest].name, + TRUE); + } + dest++; + } + } +} + static bool dsync_mailbox_has_changed_msgs(struct dsync_brain *brain, const struct dsync_mailbox *box1, const struct dsync_mailbox *box2) @@ -380,10 +549,23 @@ dsync_worker_mailbox_input(brain->src_mailbox_list); dsync_worker_mailbox_input(brain->dest_mailbox_list); break; + case DSYNC_STATE_GET_SUBSCRIPTIONS: + i_assert(brain->src_subs_list == NULL); + brain->src_subs_list = + dsync_brain_subs_list_init(brain, brain->src_worker); + brain->dest_subs_list = + dsync_brain_subs_list_init(brain, brain->dest_worker); + dsync_worker_subs_input(brain->src_subs_list); + dsync_worker_subs_input(brain->dest_subs_list); + break; case DSYNC_STATE_SYNC_MAILBOXES: dsync_brain_sync_mailboxes(brain); brain->state++; /* fall through */ + case DSYNC_STATE_SYNC_SUBSCRIPTIONS: + dsync_brain_sync_subscriptions(brain); + brain->state++; + /* fall through */ case DSYNC_STATE_SYNC_MSGS: dsync_brain_sync_msgs(brain); break;
--- a/src/dsync/dsync-data.c Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/dsync-data.c Fri Nov 13 20:55:05 2009 -0500 @@ -3,6 +3,7 @@ #include "lib.h" #include "buffer.h" #include "hex-binary.h" +#include "sha1.h" #include "dsync-data.h" struct dsync_mailbox * @@ -88,6 +89,11 @@ return memcmp(guid1->guid, guid2->guid, sizeof(guid1->guid)) == 0; } +int dsync_guid_cmp(const mailbox_guid_t *guid1, const mailbox_guid_t *guid2) +{ + return memcmp(guid1->guid, guid2->guid, sizeof(guid1->guid)); +} + const char *dsync_guid_to_str(const mailbox_guid_t *guid) { return binary_to_hex(guid->guid, sizeof(guid->guid)); @@ -108,3 +114,11 @@ buffer_append_c(&guid_128_buf, '\0'); return guid_128_buf.data; } + +void dsync_str_sha_to_guid(const char *str, mailbox_guid_t *guid) +{ + unsigned char sha[SHA1_RESULTLEN]; + + sha1_get_digest(str, strlen(str), sha); + memcpy(guid->guid, sha, I_MIN(sizeof(guid->guid), sizeof(sha))); +}
--- a/src/dsync/dsync-data.h Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/dsync-data.h Fri Nov 13 20:55:05 2009 -0500 @@ -69,8 +69,10 @@ bool dsync_guid_equals(const mailbox_guid_t *guid1, const mailbox_guid_t *guid2); +int dsync_guid_cmp(const mailbox_guid_t *guid1, const mailbox_guid_t *guid2); const char *dsync_guid_to_str(const mailbox_guid_t *guid); const char *dsync_get_guid_128_str(const char *guid, unsigned char *dest, unsigned int dest_len); +void dsync_str_sha_to_guid(const char *str, mailbox_guid_t *guid); #endif
--- a/src/dsync/dsync-proxy-client.c Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/dsync-proxy-client.c Fri Nov 13 20:55:05 2009 -0500 @@ -40,6 +40,11 @@ pool_t pool; }; +struct proxy_client_dsync_worker_subs_iter { + struct dsync_worker_subs_iter iter; + pool_t pool; +}; + struct proxy_client_dsync_worker { struct dsync_worker worker; int fd_in, fd_out; @@ -397,6 +402,121 @@ return ret; } +static struct dsync_worker_subs_iter * +proxy_client_worker_subs_iter_init(struct dsync_worker *_worker) +{ + struct proxy_client_dsync_worker *worker = + (struct proxy_client_dsync_worker *)_worker; + struct proxy_client_dsync_worker_subs_iter *iter; + + iter = i_new(struct proxy_client_dsync_worker_subs_iter, 1); + iter->iter.worker = _worker; + iter->pool = pool_alloconly_create("proxy subscription iter", 1024); + o_stream_send_str(worker->output, "SUBS-LIST\n"); + proxy_client_worker_output_flush(_worker); + return &iter->iter; +} + +static int +proxy_client_worker_subs_iter_next_line(struct proxy_client_dsync_worker_subs_iter *iter, + const char **name_r, + time_t *last_change_r) +{ + struct proxy_client_dsync_worker *worker = + (struct proxy_client_dsync_worker *)iter->iter.worker; + const char *line; + char **args; + int ret; + + if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) { + if (ret < 0) + iter->iter.failed = TRUE; + return ret; + } + + if (*line == '\t') { + /* end of subscribed subscriptions */ + if (line[1] != '0') + iter->iter.failed = TRUE; + return -1; + } + + p_clear(iter->pool); + args = p_strsplit(iter->pool, line, "\t"); + if (args[0] == NULL || args[1] == NULL) { + i_error("Invalid subscription input from worker server"); + iter->iter.failed = TRUE; + return -1; + } + *name_r = args[0]; + *last_change_r = strtoul(args[1], NULL, 10); + return 1; +} + +static int +proxy_client_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter, + const char **name_r, time_t *last_change_r) +{ + struct proxy_client_dsync_worker_subs_iter *iter = + (struct proxy_client_dsync_worker_subs_iter *)_iter; + + return proxy_client_worker_subs_iter_next_line(iter, name_r, + last_change_r); +} + +static int +proxy_client_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter, + mailbox_guid_t *name_sha1_r, + time_t *last_change_r) +{ + struct proxy_client_dsync_worker_subs_iter *iter = + (struct proxy_client_dsync_worker_subs_iter *)_iter; + const char *name; + int ret; + + ret = proxy_client_worker_subs_iter_next_line(iter, &name, + last_change_r); + if (ret <= 0) + return ret; + + if (dsync_proxy_mailbox_guid_import(name, name_sha1_r) < 0) { + i_error("Invalid subscription input from worker server: " + "Invalid unsubscription mailbox GUID"); + iter->iter.failed = TRUE; + return -1; + } + return 1; +} + +static int +proxy_client_worker_subs_iter_deinit(struct dsync_worker_subs_iter *_iter) +{ + struct proxy_client_dsync_worker_subs_iter *iter = + (struct proxy_client_dsync_worker_subs_iter *)_iter; + int ret = _iter->failed ? -1 : 0; + + pool_unref(&iter->pool); + i_free(iter); + return ret; +} + +static void +proxy_client_worker_set_subscribed(struct dsync_worker *_worker, + const char *name, bool set) +{ + struct proxy_client_dsync_worker *worker = + (struct proxy_client_dsync_worker *)_worker; + + T_BEGIN { + string_t *str = t_str_new(128); + + str_append(str, "SUBS-SET\t"); + str_tabescape_write(str, name); + str_printfa(str, "\t%d\n", set ? 1 : 0); + o_stream_send(worker->output, str_data(str), str_len(str)); + } T_END; +} + struct proxy_client_dsync_worker_msg_iter { struct dsync_worker_msg_iter iter; pool_t pool; @@ -822,6 +942,12 @@ proxy_client_worker_mailbox_iter_next, proxy_client_worker_mailbox_iter_deinit, + proxy_client_worker_subs_iter_init, + proxy_client_worker_subs_iter_next, + proxy_client_worker_subs_iter_next_un, + proxy_client_worker_subs_iter_deinit, + proxy_client_worker_set_subscribed, + proxy_client_worker_msg_iter_init, proxy_client_worker_msg_iter_next, proxy_client_worker_msg_iter_deinit,
--- a/src/dsync/dsync-proxy-server-cmd.c Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/dsync-proxy-server-cmd.c Fri Nov 13 20:55:05 2009 -0500 @@ -62,6 +62,102 @@ } } +static bool cmd_subs_list_subscriptions(struct dsync_proxy_server *server) +{ + const char *name; + time_t last_change; + string_t *str; + int ret; + + str = t_str_new(256); + while ((ret = dsync_worker_subs_iter_next(server->subs_iter, + &name, &last_change)) > 0) { + str_truncate(str, 0); + str_tabescape_write(str, name); + str_printfa(str, "\t%ld\n", (long)last_change); + o_stream_send(server->output, str_data(str), str_len(str)); + if (proxy_server_is_output_full(server)) + break; + } + if (ret >= 0) { + /* continue later */ + o_stream_set_flush_pending(server->output, TRUE); + return FALSE; + } + return TRUE; +} + +static bool cmd_subs_list_unsubscriptions(struct dsync_proxy_server *server) +{ + mailbox_guid_t name_sha1; + time_t last_change; + string_t *str; + int ret; + + str = t_str_new(256); + while ((ret = dsync_worker_subs_iter_next_un(server->subs_iter, + &name_sha1, + &last_change)) > 0) { + str_truncate(str, 0); + dsync_proxy_mailbox_guid_export(str, &name_sha1); + str_printfa(str, "\t%ld\n", (long)last_change); + o_stream_send(server->output, str_data(str), str_len(str)); + if (proxy_server_is_output_full(server)) + break; + } + if (ret >= 0) { + /* continue later */ + o_stream_set_flush_pending(server->output, TRUE); + return FALSE; + } + return TRUE; +} + +static int +cmd_subs_list(struct dsync_proxy_server *server, + const char *const *args ATTR_UNUSED) +{ + int ret = 1; + + if (server->subs_iter == NULL) { + server->subs_iter = + dsync_worker_subs_iter_init(server->worker); + } + + if (!server->subs_sending_unsubscriptions) { + if (!cmd_subs_list_subscriptions(server)) + return 0; + o_stream_send(server->output, "\t0\n", 3); + server->subs_sending_unsubscriptions = TRUE; + } + if (ret > 0) { + if (!cmd_subs_list_unsubscriptions(server)) + return 0; + } + + server->subs_sending_unsubscriptions = FALSE; + if (dsync_worker_subs_iter_deinit(&server->subs_iter) < 0) { + o_stream_send(server->output, "\t-1\n", 4); + return -1; + } else { + o_stream_send(server->output, "\t0\n", 3); + return 1; + } +} + +static int +cmd_subs_set(struct dsync_proxy_server *server, const char *const *args) +{ + if (args[0] == NULL || args[1] == NULL) { + i_error("subs-set: Missing parameters"); + return -1; + } + + dsync_worker_set_subscribed(server->worker, args[0], + strcmp(args[1], "1") == 0); + return 1; +} + static int cmd_msg_list_init(struct dsync_proxy_server *server, const char *const *args) { @@ -412,6 +508,8 @@ static struct dsync_proxy_server_command commands[] = { { "BOX-LIST", cmd_box_list }, + { "SUBS-LIST", cmd_subs_list }, + { "SUBS-SET", cmd_subs_set }, { "MSG-LIST", cmd_msg_list }, { "BOX-CREATE", cmd_box_create }, { "BOX-DELETE", cmd_box_delete },
--- a/src/dsync/dsync-proxy-server.h Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/dsync-proxy-server.h Fri Nov 13 20:55:05 2009 -0500 @@ -22,12 +22,14 @@ const char *const *cur_args; struct dsync_worker_mailbox_iter *mailbox_iter; + struct dsync_worker_subs_iter *subs_iter; struct dsync_worker_msg_iter *msg_iter; struct istream *get_input; bool get_input_last_lf; uint32_t get_uid; + unsigned int subs_sending_unsubscriptions:1; unsigned int finished:1; };
--- a/src/dsync/dsync-worker-local.c Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/dsync-worker-local.c Fri Nov 13 20:55:05 2009 -0500 @@ -20,6 +20,12 @@ struct hash_iterate_context *deleted_iter; }; +struct local_dsync_worker_subs_iter { + struct dsync_worker_subs_iter iter; + struct mailbox_list_iterate_context *list_iter; + struct hash_iterate_context *deleted_iter; +}; + struct local_dsync_worker_msg_iter { struct dsync_worker_msg_iter iter; mailbox_guid_t *mailboxes; @@ -45,6 +51,11 @@ time_t last_renamed; unsigned int deleted_mailbox:1; unsigned int deleted_dir:1; +}; +struct local_dsync_subscription_change { + mailbox_guid_t name_sha1; + struct mailbox_list *list; + time_t last_change; unsigned int unsubscribed:1; }; @@ -57,6 +68,8 @@ struct hash_table *mailbox_hash; /* mailbox_guid_t -> struct local_dsync_mailbox_change* */ struct hash_table *mailbox_changes_hash; + /* mailbox_guid_t -> struct local_dsync_subscription_change */ + struct hash_table *subscription_changes_hash; mailbox_guid_t selected_box_guid; struct mailbox *selected_box; @@ -131,6 +144,8 @@ hash_table_destroy(&worker->mailbox_hash); if (worker->mailbox_changes_hash != NULL) hash_table_destroy(&worker->mailbox_changes_hash); + if (worker->subscription_changes_hash != NULL) + hash_table_destroy(&worker->subscription_changes_hash); pool_unref(&worker->pool); } @@ -144,6 +159,79 @@ return 1; } +static void +dsync_worker_save_mailbox_change(struct local_dsync_worker *worker, + const struct mailbox_log_record *rec) +{ + struct local_dsync_mailbox_change *change; + + change = hash_table_lookup(worker->mailbox_changes_hash, + rec->mailbox_guid); + if (change == NULL) { + change = i_new(struct local_dsync_mailbox_change, 1); + memcpy(change->guid.guid, rec->mailbox_guid, + sizeof(change->guid.guid)); + hash_table_insert(worker->mailbox_changes_hash, + change->guid.guid, change); + } + switch (rec->type) { + case MAILBOX_LOG_RECORD_DELETE_MAILBOX: + change->deleted_mailbox = TRUE; + break; + case MAILBOX_LOG_RECORD_DELETE_DIR: + change->deleted_dir = TRUE; + break; + case MAILBOX_LOG_RECORD_RENAME: + change->last_renamed = + mailbox_log_record_get_timestamp(rec); + break; + case MAILBOX_LOG_RECORD_SUBSCRIBE: + case MAILBOX_LOG_RECORD_UNSUBSCRIBE: + i_unreached(); + } + if (change->deleted_dir && change->deleted_mailbox) { + /* same GUID shouldn't be both. something's already + broken, but change this so we don't get into more + problems later. */ + change->deleted_dir = FALSE; + } +} + +static void +dsync_worker_save_subscription_change(struct local_dsync_worker *worker, + struct mailbox_list *list, + const struct mailbox_log_record *rec) +{ + struct local_dsync_subscription_change *change, new_change; + + memset(&new_change, 0, sizeof(new_change)); + new_change.list = list; + memcpy(new_change.name_sha1.guid, rec->mailbox_guid, + sizeof(new_change.name_sha1.guid)); + + change = hash_table_lookup(worker->subscription_changes_hash, + &new_change); + if (change == NULL) { + change = i_new(struct local_dsync_subscription_change, 1); + *change = new_change; + hash_table_insert(worker->subscription_changes_hash, + change, change); + } + switch (rec->type) { + case MAILBOX_LOG_RECORD_DELETE_MAILBOX: + case MAILBOX_LOG_RECORD_DELETE_DIR: + case MAILBOX_LOG_RECORD_RENAME: + i_unreached(); + case MAILBOX_LOG_RECORD_SUBSCRIBE: + change->unsubscribed = FALSE; + break; + case MAILBOX_LOG_RECORD_UNSUBSCRIBE: + change->unsubscribed = TRUE; + break; + } + change->last_change = mailbox_log_record_get_timestamp(rec); +} + static int dsync_worker_get_list_mailbox_log(struct local_dsync_worker *worker, struct mailbox_list *list) @@ -151,44 +239,22 @@ struct mailbox_log *log; struct mailbox_log_iter *iter; const struct mailbox_log_record *rec; - struct local_dsync_mailbox_change *change; log = mailbox_list_get_changelog(list); iter = mailbox_log_iter_init(log); while ((rec = mailbox_log_iter_next(iter)) != NULL) { - change = hash_table_lookup(worker->mailbox_changes_hash, - rec->mailbox_guid); - if (change == NULL) { - change = i_new(struct local_dsync_mailbox_change, 1); - memcpy(change->guid.guid, rec->mailbox_guid, - sizeof(change->guid.guid)); - hash_table_insert(worker->mailbox_changes_hash, - change->guid.guid, change); - } switch (rec->type) { case MAILBOX_LOG_RECORD_DELETE_MAILBOX: - change->deleted_mailbox = TRUE; - break; case MAILBOX_LOG_RECORD_DELETE_DIR: - change->deleted_dir = TRUE; - break; case MAILBOX_LOG_RECORD_RENAME: - change->last_renamed = - mailbox_log_record_get_timestamp(rec); + dsync_worker_save_mailbox_change(worker, rec); break; case MAILBOX_LOG_RECORD_SUBSCRIBE: - change->unsubscribed = FALSE; - break; case MAILBOX_LOG_RECORD_UNSUBSCRIBE: - change->unsubscribed = TRUE; + dsync_worker_save_subscription_change(worker, + list, rec); break; } - if (change->deleted_dir && change->deleted_mailbox) { - /* same GUID shouldn't be both. something's already - broken, but change this so we don't get into more - problems later. */ - change->deleted_dir = FALSE; - } } return mailbox_log_iter_deinit(&iter); } @@ -208,6 +274,25 @@ return memcmp(p1, p2, MAIL_GUID_128_SIZE); } +static unsigned int subscription_change_hash(const void *p) +{ + const struct local_dsync_subscription_change *change = p; + + return mailbox_log_record_hash(change->name_sha1.guid) ^ + (unsigned int)change->list; +} + +static int subscription_change_cmp(const void *p1, const void *p2) +{ + const struct local_dsync_subscription_change *c1 = p1, *c2 = p2; + + if (c1->list != c2->list) + return 1; + + return memcmp(c1->name_sha1.guid, c2->name_sha1.guid, + MAIL_GUID_128_SIZE); +} + static int dsync_worker_get_mailbox_log(struct local_dsync_worker *worker) { struct mail_namespace *ns; @@ -220,6 +305,10 @@ hash_table_create(default_pool, worker->pool, 0, mailbox_log_record_hash, mailbox_log_record_cmp); + worker->subscription_changes_hash = + hash_table_create(default_pool, worker->pool, 0, + subscription_change_hash, + subscription_change_cmp); for (ns = worker->user->namespaces; ns != NULL; ns = ns->next) { if (ns->alias_for != NULL) continue; @@ -395,6 +484,125 @@ return ret; } +static struct dsync_worker_subs_iter * +local_worker_subs_iter_init(struct dsync_worker *_worker) +{ + struct local_dsync_worker *worker = + (struct local_dsync_worker *)_worker; + struct local_dsync_worker_subs_iter *iter; + enum mailbox_list_iter_flags list_flags = + MAILBOX_LIST_ITER_VIRTUAL_NAMES | + MAILBOX_LIST_ITER_SELECT_SUBSCRIBED; + static const char *patterns[] = { "*", NULL }; + + iter = i_new(struct local_dsync_worker_subs_iter, 1); + iter->iter.worker = _worker; + iter->list_iter = + mailbox_list_iter_init_namespaces(worker->user->namespaces, + patterns, list_flags); + (void)dsync_worker_get_mailbox_log(worker); + return &iter->iter; +} + +static int +local_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter, + const char **name_r, time_t *last_change_r) +{ + struct local_dsync_worker_subs_iter *iter = + (struct local_dsync_worker_subs_iter *)_iter; + struct local_dsync_worker *worker = + (struct local_dsync_worker *)_iter->worker; + struct local_dsync_subscription_change *change, change_lookup; + const struct mailbox_info *info; + const char *storage_name; + + info = mailbox_list_iter_next(iter->list_iter); + if (info == NULL) + return -1; + + storage_name = mail_namespace_get_storage_name(info->ns, info->name); + dsync_str_sha_to_guid(storage_name, &change_lookup.name_sha1); + change_lookup.list = info->ns->list; + + change = hash_table_lookup(worker->subscription_changes_hash, + &change_lookup); + if (change != NULL) { + /* it shouldn't be marked as unsubscribed, but drop it to + be sure */ + change->unsubscribed = FALSE; + *last_change_r = change->last_change; + } else { + *last_change_r = 0; + } + *name_r = info->name; + return 1; +} + +static int +local_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter, + mailbox_guid_t *name_sha1_r, + time_t *last_change_r) +{ + struct local_dsync_worker_subs_iter *iter = + (struct local_dsync_worker_subs_iter *)_iter; + struct local_dsync_worker *worker = + (struct local_dsync_worker *)_iter->worker; + void *key, *value; + + if (iter->deleted_iter == NULL) { + iter->deleted_iter = + hash_table_iterate_init(worker->subscription_changes_hash); + } + while (hash_table_iterate(iter->deleted_iter, &key, &value)) { + const struct local_dsync_subscription_change *change = value; + + if (change->unsubscribed) { + /* the name doesn't matter */ + *name_sha1_r = change->name_sha1; + *last_change_r = change->last_change; + return 1; + } + } + hash_table_iterate_deinit(&iter->deleted_iter); + return -1; +} + +static int +local_worker_subs_iter_deinit(struct dsync_worker_subs_iter *_iter) +{ + struct local_dsync_worker_subs_iter *iter = + (struct local_dsync_worker_subs_iter *)_iter; + int ret = _iter->failed ? -1 : 0; + + if (mailbox_list_iter_deinit(&iter->list_iter) < 0) + ret = -1; + i_free(iter); + return ret; +} + +static void +local_worker_set_subscribed(struct dsync_worker *_worker, + const char *name, bool set) +{ + struct local_dsync_worker *worker = + (struct local_dsync_worker *)_worker; + struct mail_namespace *ns; + const char *storage_name; + + storage_name = name; + ns = mail_namespace_find(worker->user->namespaces, &storage_name); + if (ns == NULL) { + i_error("Can't find namespace for mailbox %s", name); + return; + } + + if (mailbox_list_set_subscribed(ns->list, storage_name, set) < 0) { + dsync_worker_set_failure(_worker); + i_error("Can't update subscription %s: %s", name, + mailbox_list_get_last_error(ns->list, NULL)); + } +} + static int local_mailbox_open(struct local_dsync_worker *worker, const mailbox_guid_t *guid, struct mailbox **box_r) @@ -1151,6 +1359,12 @@ local_worker_mailbox_iter_next, local_worker_mailbox_iter_deinit, + local_worker_subs_iter_init, + local_worker_subs_iter_next, + local_worker_subs_iter_next_un, + local_worker_subs_iter_deinit, + local_worker_set_subscribed, + local_worker_msg_iter_init, local_worker_msg_iter_next, local_worker_msg_iter_deinit,
--- a/src/dsync/dsync-worker-private.h Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/dsync-worker-private.h Fri Nov 13 20:55:05 2009 -0500 @@ -17,6 +17,17 @@ struct dsync_mailbox *dsync_box_r); int (*mailbox_iter_deinit)(struct dsync_worker_mailbox_iter *iter); + struct dsync_worker_subs_iter * + (*subs_iter_init)(struct dsync_worker *worker); + int (*subs_iter_next)(struct dsync_worker_subs_iter *iter, + const char **name_r, time_t *last_change_r); + int (*subs_iter_next_un)(struct dsync_worker_subs_iter *iter, + mailbox_guid_t *name_sha1_r, + time_t *last_change_r); + int (*subs_iter_deinit)(struct dsync_worker_subs_iter *iter); + void (*set_subscribed)(struct dsync_worker *worker, + const char *name, bool set); + struct dsync_worker_msg_iter * (*msg_iter_init)(struct dsync_worker *worker, const mailbox_guid_t mailboxes[], @@ -72,6 +83,11 @@ bool failed; }; +struct dsync_worker_subs_iter { + struct dsync_worker *worker; + bool failed; +}; + struct dsync_worker_msg_iter { struct dsync_worker *worker; bool failed;
--- a/src/dsync/dsync-worker.c Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/dsync-worker.c Fri Nov 13 20:55:05 2009 -0500 @@ -61,6 +61,40 @@ return iter->worker->v.mailbox_iter_deinit(iter); } +struct dsync_worker_subs_iter * +dsync_worker_subs_iter_init(struct dsync_worker *worker) +{ + return worker->v.subs_iter_init(worker); +} + +int dsync_worker_subs_iter_next(struct dsync_worker_subs_iter *iter, + const char **name_r, time_t *last_change_r) +{ + return iter->worker->v.subs_iter_next(iter, name_r, last_change_r); +} + +int dsync_worker_subs_iter_next_un(struct dsync_worker_subs_iter *iter, + mailbox_guid_t *sha1_name_r, + time_t *last_change_r) +{ + return iter->worker->v.subs_iter_next_un(iter, sha1_name_r, + last_change_r); +} + +int dsync_worker_subs_iter_deinit(struct dsync_worker_subs_iter **_iter) +{ + struct dsync_worker_subs_iter *iter = *_iter; + + *_iter = NULL; + return iter->worker->v.subs_iter_deinit(iter); +} + +void dsync_worker_set_subscribed(struct dsync_worker *worker, + const char *name, bool set) +{ + worker->v.set_subscribed(worker, name, set); +} + struct dsync_worker_msg_iter * dsync_worker_msg_iter_init(struct dsync_worker *worker, const mailbox_guid_t mailboxes[],
--- a/src/dsync/dsync-worker.h Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/dsync-worker.h Fri Nov 13 20:55:05 2009 -0500 @@ -48,6 +48,23 @@ /* Finish mailbox iteration. Returns 0 if ok, -1 if iteration failed. */ int dsync_worker_mailbox_iter_deinit(struct dsync_worker_mailbox_iter **iter); +/* Iterate though all subscriptions */ +struct dsync_worker_subs_iter * +dsync_worker_subs_iter_init(struct dsync_worker *worker); +/* Get the next subscription. Returns 1 if ok, 0 if waiting for more data, + -1 if there are no more subscriptions. */ +int dsync_worker_subs_iter_next(struct dsync_worker_subs_iter *iter, + const char **name_r, time_t *last_change_r); +/* Like _iter_next(), but list known recent unsubscriptions. */ +int dsync_worker_subs_iter_next_un(struct dsync_worker_subs_iter *iter, + mailbox_guid_t *name_sha1_r, + time_t *last_change_r); +/* Finish subscription iteration. Returns 0 if ok, -1 if iteration failed. */ +int dsync_worker_subs_iter_deinit(struct dsync_worker_subs_iter **iter); +/* Subscribe/unsubscribe mailbox */ +void dsync_worker_set_subscribed(struct dsync_worker *worker, + const char *name, bool set); + /* Iterate through all messages in given mailboxes. The mailboxes are iterated in the given order. */ struct dsync_worker_msg_iter *
--- a/src/dsync/test-dsync-brain.c Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/test-dsync-brain.c Fri Nov 13 20:55:05 2009 -0500 @@ -67,6 +67,13 @@ test_worker->worker.input_callback(test_worker->worker.input_context); } +static void subscriptions_send_to_worker(struct test_dsync_worker *test_worker) +{ + test_worker->subs_iter.last_subs = TRUE; + test_worker->subs_iter.last_unsubs = TRUE; + test_worker->worker.input_callback(test_worker->worker.input_context); +} + static bool test_dsync_mailbox_create_equals(const struct dsync_mailbox *cbox, const struct dsync_mailbox *obox) @@ -160,6 +167,11 @@ mailboxes_send_to_worker(src_test_worker, src_boxes); mailboxes_send_to_worker(dest_test_worker, dest_boxes); + subscriptions_send_to_worker(src_test_worker); + subscriptions_send_to_worker(dest_test_worker); + + test_assert(brain->state == DSYNC_STATE_SYNC_MSGS); + /* check that it created/deleted missing mailboxes */ test_assert(test_dsync_worker_next_box_event(dest_test_worker, &box_event)); test_assert(box_event.type == LAST_BOX_TYPE_CREATE); @@ -245,6 +257,11 @@ mailboxes_send_to_worker(src_test_worker, boxes); mailboxes_send_to_worker(dest_test_worker, boxes); + subscriptions_send_to_worker(src_test_worker); + subscriptions_send_to_worker(dest_test_worker); + + test_assert(brain->state == DSYNC_STATE_SYNC_MSGS); + test_assert(!test_dsync_worker_next_box_event(src_test_worker, &box_event)); test_assert(!test_dsync_worker_next_box_event(dest_test_worker, &box_event));
--- a/src/dsync/test-dsync-proxy-server-cmd.c Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/test-dsync-proxy-server-cmd.c Fri Nov 13 20:55:05 2009 -0500 @@ -112,6 +112,44 @@ test_end(); } +static void test_dsync_proxy_subs_list(void) +{ + const char *name; + mailbox_guid_t name_sha1; + + test_begin("proxy server subs list"); + + test_assert(run_cmd("SUBS-LIST", NULL) == 0); + + /* subscription */ + name = "\t\001\r\nname\t\001\n\r"; + test_worker->subs_iter.next_name = name; + test_worker->subs_iter.next_last_change = 1234567890; + test_assert(run_more() == 0); + test_assert(strcmp(str_c(out), t_strconcat( + str_tabescape(name), "\t1234567890\n", NULL)) == 0); + out_clear(); + + test_worker->subs_iter.last_subs = TRUE; + test_assert(run_more() == 0); + test_assert(strcmp(str_c(out), "\t0\n") == 0); + out_clear(); + + /* unsubscription */ + memcpy(name_sha1.guid, test_mailbox_guid1, sizeof(name_sha1.guid)); + test_worker->subs_iter.next_unsubscription = &name_sha1; + test_assert(run_more() == 0); + test_assert(strcmp(str_c(out), TEST_MAILBOX_GUID1"\t1234567890\n") == 0); + out_clear(); + + test_worker->subs_iter.last_unsubs = TRUE; + test_assert(run_more() == 1); + test_assert(strcmp(str_c(out), "\t0\n") == 0); + out_clear(); + + test_end(); +} + static void test_dsync_proxy_msg_list(void) { static const char *test_keywords[] = { @@ -399,6 +437,7 @@ { static void (*test_functions[])(void) = { test_dsync_proxy_box_list, + test_dsync_proxy_subs_list, test_dsync_proxy_msg_list, test_dsync_proxy_box_create, test_dsync_proxy_box_delete,
--- a/src/dsync/test-dsync-worker.c Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/test-dsync-worker.c Fri Nov 13 20:55:05 2009 -0500 @@ -80,6 +80,60 @@ return 0; } +static struct dsync_worker_subs_iter * +test_worker_subs_iter_init(struct dsync_worker *_worker) +{ + struct test_dsync_worker *worker = (struct test_dsync_worker *)_worker; + + i_assert(worker->subs_iter.iter.worker == NULL); + + worker->subs_iter.iter.worker = _worker; + return &worker->subs_iter.iter; +} + +static int +test_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter, + const char **name_r, time_t *last_change_r) +{ + struct test_dsync_worker_subs_iter *iter = + (struct test_dsync_worker_subs_iter *)_iter; + + if (iter->next_name == NULL) + return iter->last_subs ? -1 : 0; + + *name_r = iter->next_name; + *last_change_r = iter->next_last_change; + iter->next_name = NULL; + return 1; +} + +static int +test_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter, + mailbox_guid_t *name_sha1_r, + time_t *last_change_r) +{ + struct test_dsync_worker_subs_iter *iter = + (struct test_dsync_worker_subs_iter *)_iter; + + if (iter->next_unsubscription == NULL) + return iter->last_unsubs ? -1 : 0; + + *name_sha1_r = *iter->next_unsubscription; + *last_change_r = iter->next_last_change; + iter->next_unsubscription = NULL; + return 1; +} + +static int +test_worker_subs_iter_deinit(struct dsync_worker_subs_iter *iter) +{ + struct test_dsync_worker *worker = + (struct test_dsync_worker *)iter->worker; + + memset(&worker->subs_iter, 0, sizeof(worker->subs_iter)); + return 0; +} + static struct dsync_worker_msg_iter * test_worker_msg_iter_init(struct dsync_worker *_worker, const mailbox_guid_t mailboxes[], @@ -160,6 +214,19 @@ } static void +test_worker_set_subscribed(struct dsync_worker *_worker, + const char *name, bool set) +{ + struct dsync_mailbox dsync_box; + + memset(&dsync_box, 0, sizeof(dsync_box)); + dsync_box.name = name; + test_worker_set_last_box(_worker, &dsync_box, + set ? LAST_BOX_TYPE_SUBSCRIBE : + LAST_BOX_TYPE_UNSUBSCRIBE); +} + +static void test_worker_create_mailbox(struct dsync_worker *_worker, const struct dsync_mailbox *dsync_box) { @@ -368,6 +435,12 @@ test_worker_mailbox_iter_next, test_worker_mailbox_iter_deinit, + test_worker_subs_iter_init, + test_worker_subs_iter_next, + test_worker_subs_iter_next_un, + test_worker_subs_iter_deinit, + test_worker_set_subscribed, + test_worker_msg_iter_init, test_worker_msg_iter_next, test_worker_msg_iter_deinit,
--- a/src/dsync/test-dsync-worker.h Fri Nov 13 20:54:50 2009 -0500 +++ b/src/dsync/test-dsync-worker.h Fri Nov 13 20:55:05 2009 -0500 @@ -7,7 +7,9 @@ LAST_BOX_TYPE_CREATE, LAST_BOX_TYPE_DELETE, LAST_BOX_TYPE_RENAME, - LAST_BOX_TYPE_UPDATE + LAST_BOX_TYPE_UPDATE, + LAST_BOX_TYPE_SUBSCRIBE, + LAST_BOX_TYPE_UNSUBSCRIBE }; enum test_dsync_last_msg_type { @@ -24,6 +26,14 @@ bool last; }; +struct test_dsync_worker_subs_iter { + struct dsync_worker_subs_iter iter; + const char *next_name; + mailbox_guid_t *next_unsubscription; + time_t next_last_change; + bool last_subs, last_unsubs; +}; + struct test_dsync_worker_msg { struct dsync_message msg; unsigned int mailbox_idx; @@ -61,6 +71,7 @@ struct istream *body_stream; struct test_dsync_worker_mailbox_iter box_iter; + struct test_dsync_worker_subs_iter subs_iter; struct test_dsync_worker_msg_iter msg_iter; ARRAY_DEFINE(results, struct test_dsync_worker_result);