Mercurial > dovecot > core-2.2
changeset 9686:4d5cc6ce68aa HEAD
dsync: Lots of improvements and fixes. Appears to be somewhat working now.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Mon, 27 Jul 2009 19:04:36 -0400 |
parents | be433e394f69 |
children | ce253d167757 |
files | src/dsync/Makefile.am src/dsync/dsync-brain-msgs-new.c src/dsync/dsync-brain-msgs.c src/dsync/dsync-brain-private.h src/dsync/dsync-brain.c src/dsync/dsync-brain.h src/dsync/dsync-data.c src/dsync/dsync-data.h src/dsync/dsync-proxy-client.c src/dsync/dsync-proxy-server-cmd.c src/dsync/dsync-proxy-server.c src/dsync/dsync-proxy-server.h src/dsync/dsync-proxy.c src/dsync/dsync-proxy.h src/dsync/dsync-worker-local.c src/dsync/dsync-worker-private.h src/dsync/dsync-worker.c src/dsync/dsync-worker.h src/dsync/dsync.c src/dsync/test-dsync-brain-msgs.c src/dsync/test-dsync-brain.c src/dsync/test-dsync-common.c src/dsync/test-dsync-common.h src/dsync/test-dsync-proxy-server-cmd.c src/dsync/test-dsync-worker.c src/dsync/test-dsync-worker.h |
diffstat | 26 files changed, 2180 insertions(+), 1379 deletions(-) [+] |
line wrap: on
line diff
--- a/src/dsync/Makefile.am Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/Makefile.am Mon Jul 27 19:04:36 2009 -0400 @@ -16,6 +16,8 @@ dsync_SOURCES = \ dsync.c \ dsync-brain.c \ + dsync-brain-msgs.c \ + dsync-brain-msgs-new.c \ dsync-data.c \ dsync-proxy.c \ dsync-proxy-client.c \ @@ -37,6 +39,7 @@ test_programs = \ test-dsync-brain \ + test-dsync-brain-msgs \ test-dsync-proxy \ test-dsync-proxy-server-cmd @@ -53,6 +56,10 @@ test_dsync_brain_LDADD = test-dsync-worker.o dsync-data.o dsync-brain.o dsync-worker.o $(test_libs) test_dsync_brain_DEPENDENCIES = test-dsync-worker.o dsync-data.o dsync-brain.o dsync-worker.o $(test_libs) +test_dsync_brain_msgs_SOURCES = test-dsync-brain-msgs.c +test_dsync_brain_msgs_LDADD = test-dsync-worker.o dsync-data.o dsync-brain-msgs.o dsync-worker.o $(test_libs) +test_dsync_brain_msgs_DEPENDENCIES = test-dsync-worker.o dsync-data.o dsync-brain-msgs.o dsync-worker.o $(test_libs) + test_dsync_proxy_SOURCES = test-dsync-proxy.c test_dsync_proxy_LDADD = dsync-proxy.o $(test_libs) test_dsync_proxy_DEPENDENCIES = dsync-proxy.o $(test_libs)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/dsync/dsync-brain-msgs-new.c Mon Jul 27 19:04:36 2009 -0400 @@ -0,0 +1,265 @@ +/* Copyright (c) 2009 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "hash.h" +#include "dsync-worker.h" +#include "dsync-brain-private.h" + +struct dsync_brain_msg_copy_context { + struct dsync_brain_msg_iter *iter; + unsigned int msg_idx; +}; + +struct dsync_brain_msg_save_context { + struct dsync_brain_msg_iter *iter; + + mailbox_guid_t mailbox; + const struct dsync_message *msg; +}; + +static void +dsync_brain_msg_sync_retry_copies(struct dsync_brain_mailbox_sync *sync); + +static bool +dsync_brain_msg_sync_is_save_done(struct dsync_brain_mailbox_sync *sync) +{ + return sync->src_msg_iter->copy_results_left == 0 && + sync->dest_msg_iter->copy_results_left == 0 && + sync->src_msg_iter->save_results_left == 0 && + sync->dest_msg_iter->save_results_left == 0; +} + +static void msg_get_callback(enum dsync_msg_get_result result, + struct dsync_msg_static_data *data, + void *context) +{ + struct dsync_brain_msg_save_context *ctx = context; + + switch (result) { + case DSYNC_MSG_GET_RESULT_SUCCESS: + dsync_worker_select_mailbox(ctx->iter->worker, &ctx->mailbox); + dsync_worker_msg_save(ctx->iter->worker, ctx->msg, data); + break; + case DSYNC_MSG_GET_RESULT_EXPUNGED: + /* mail got expunged during sync. just skip this. */ + break; + case DSYNC_MSG_GET_RESULT_FAILED: + dsync_brain_fail(ctx->iter->sync->brain); + break; + } + ctx->iter->save_results_left--; +} + +static void dsync_brain_copy_callback(bool success, void *context) +{ + struct dsync_brain_msg_copy_context *ctx = context; + const struct dsync_brain_new_msg *msg; + struct dsync_brain_guid_instance *inst; + + ctx->iter->copy_results_left--; + if (!success) { + /* mark the guid instance invalid and try again later */ + msg = array_idx(&ctx->iter->new_msgs, ctx->msg_idx); + inst = hash_table_lookup(ctx->iter->guid_hash, msg->msg->guid); + inst->failed = TRUE; + array_append(&ctx->iter->copy_retry_indexes, &ctx->msg_idx, 1); + } + + if (dsync_brain_msg_sync_is_save_done(ctx->iter->sync)) { + ctx->iter->sync->brain->state++; + dsync_brain_sync(ctx->iter->sync->brain); + } +} + +static int +dsync_brain_msg_sync_add_new_msg(struct dsync_brain_msg_iter *dest_iter, + const mailbox_guid_t *src_mailbox, + unsigned int msg_idx, + const struct dsync_message *msg) +{ + struct dsync_brain_msg_save_context *save_ctx; + struct dsync_brain_msg_copy_context *copy_ctx; + struct dsync_brain_msg_iter *src_iter; + const struct dsync_brain_guid_instance *inst; + const struct dsync_brain_mailbox *inst_box; + + inst = hash_table_lookup(dest_iter->guid_hash, msg->guid); + if (inst != NULL) { + /* we can save this by copying an existing message */ + dsync_worker_select_mailbox(dest_iter->worker, src_mailbox); + inst_box = array_idx(&dest_iter->sync->mailboxes, + inst->mailbox_idx); + + copy_ctx = p_new(dest_iter->sync->pool, + struct dsync_brain_msg_copy_context, 1); + copy_ctx->iter = dest_iter; + copy_ctx->msg_idx = msg_idx; + + dsync_worker_msg_copy(dest_iter->worker, &inst_box->box.guid, + inst->uid, msg, dsync_brain_copy_callback, + copy_ctx); + dest_iter->copy_results_left++; + } else { + src_iter = dest_iter == dest_iter->sync->dest_msg_iter ? + dest_iter->sync->src_msg_iter : + dest_iter->sync->dest_msg_iter; + + save_ctx = p_new(src_iter->sync->pool, + struct dsync_brain_msg_save_context, 1); + save_ctx->iter = dest_iter; + save_ctx->mailbox = *src_mailbox; + save_ctx->msg = dsync_message_dup(src_iter->sync->pool, msg); + + dsync_worker_select_mailbox(src_iter->worker, src_mailbox); + dsync_worker_msg_get(src_iter->worker, msg->uid, + msg_get_callback, save_ctx); + dest_iter->save_results_left++; + } + return dsync_worker_is_output_full(dest_iter->worker) ? 0 : 1; +} + +static void +dsync_brain_msg_iter_add_new_msgs(struct dsync_brain_msg_iter *dest_iter) +{ + const struct dsync_brain_mailbox *mailboxes, *mailbox; + const struct dsync_brain_new_msg *msgs; + unsigned int i, mailbox_count, msg_count; + + mailboxes = array_get(&dest_iter->sync->mailboxes, &mailbox_count); + msgs = array_get(&dest_iter->new_msgs, &msg_count); + for (i = dest_iter->next_new_msg; i < msg_count; i++) { + mailbox = &mailboxes[msgs[i].mailbox_idx]; + if (dsync_brain_msg_sync_add_new_msg(dest_iter, + &mailbox->box.guid, i, + msgs[i].msg) <= 0) { + /* failed / continue later */ + dest_iter->next_new_msg = i + 1; + break; + } + } + dest_iter->msgs_sent = TRUE; +} + +static void +dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter) +{ + dsync_brain_msg_iter_add_new_msgs(iter); + + if (iter->sync->dest_msg_iter->msgs_sent && + iter->sync->src_msg_iter->msgs_sent && + dsync_brain_msg_sync_is_save_done(iter->sync)) + dsync_brain_msg_sync_retry_copies(iter->sync); +} + +static void dsync_worker_new_msg_output(void *context) +{ + struct dsync_brain_msg_iter *iter = context; + + dsync_brain_msg_sync_add_new_msgs(iter); +} + +static void +dsync_brain_msg_iter_sync_new_msgs(struct dsync_brain_msg_iter *iter) +{ + dsync_worker_set_input_callback(iter->worker, NULL, iter); + dsync_worker_set_output_callback(iter->worker, + dsync_worker_new_msg_output, iter); + dsync_brain_msg_sync_add_new_msgs(iter); +} + +void dsync_brain_msg_sync_new_msgs(struct dsync_brain_mailbox_sync *sync) +{ + dsync_brain_msg_iter_sync_new_msgs(sync->src_msg_iter); + dsync_brain_msg_iter_sync_new_msgs(sync->dest_msg_iter); +} + +static void +dsync_brain_msg_iter_sync_retry_copies(struct dsync_brain_msg_iter *iter) +{ + const uint32_t *indexes; + const struct dsync_brain_mailbox *mailboxes, *mailbox; + const struct dsync_brain_new_msg *msgs; + unsigned int i, msg_idx, idx_count, msg_count, mailbox_count; + struct dsync_brain_guid_instance *inst; + const char *guid_str; + void *orig_key, *orig_value; + + /* first remove GUID instances that had failed. */ + msgs = array_get(&iter->new_msgs, &msg_count); + indexes = array_get(&iter->copy_retry_indexes, &idx_count); + for (i = 0; i < idx_count; i++) { + guid_str = msgs[indexes[i]].msg->guid; + if (hash_table_lookup_full(iter->guid_hash, guid_str, + &orig_key, &orig_value)) + inst = orig_value; + else + inst = NULL; + if (inst != NULL && inst->failed) { + inst = inst->next; + if (inst == NULL) + hash_table_remove(iter->guid_hash, guid_str); + else { + hash_table_update(iter->guid_hash, orig_key, + inst); + } + } + } + + /* try saving again. there probably weren't many of them, so don't + worry about filling output buffer. */ + mailboxes = array_get(&iter->sync->mailboxes, &mailbox_count); + for (i = 0; i < idx_count; i++) { + msg_idx = indexes[i]; + mailbox = &mailboxes[msgs[msg_idx].mailbox_idx]; + (void)dsync_brain_msg_sync_add_new_msg(iter, &mailbox->box.guid, + msg_idx, + msgs[msg_idx].msg); + } + + /* if we copied anything, we'll again have to wait for the results */ + array_clear(&iter->copy_retry_indexes); + dsync_worker_set_output_callback(iter->worker, NULL, NULL); +} + +static void +dsync_brain_msg_sync_retry_copies(struct dsync_brain_mailbox_sync *sync) +{ + dsync_brain_msg_iter_sync_retry_copies(sync->dest_msg_iter); + dsync_brain_msg_iter_sync_retry_copies(sync->src_msg_iter); + + if (dsync_brain_msg_sync_is_save_done(sync)) { + dsync_worker_set_input_callback(sync->src_worker, NULL, NULL); + dsync_worker_set_input_callback(sync->dest_worker, NULL, NULL); + sync->brain->state++; + dsync_brain_sync(sync->brain); + } else { + /* temporarily move back the state. once copies have returned + success/failures, we'll get back to this function and see + if we need to retry again */ + sync->brain->state--; + } +} + +static void +sync_iter_resolve_uid_conflicts(struct dsync_brain_msg_iter *iter) +{ + const struct dsync_brain_uid_conflict *conflicts; + const struct dsync_brain_mailbox *mailboxes, *mailbox; + unsigned int i, count, mailbox_count; + + mailboxes = array_get(&iter->sync->mailboxes, &mailbox_count); + conflicts = array_get(&iter->uid_conflicts, &count); + for (i = 0; i < count; i++) { + mailbox = &mailboxes[conflicts[i].mailbox_idx]; + dsync_worker_select_mailbox(iter->worker, &mailbox->box.guid); + dsync_worker_msg_update_uid(iter->worker, conflicts[i].old_uid, + conflicts[i].new_uid); + } +} + +void dsync_brain_msg_sync_resolve_uid_conflicts(struct dsync_brain_mailbox_sync *sync) +{ + sync_iter_resolve_uid_conflicts(sync->src_msg_iter); + sync_iter_resolve_uid_conflicts(sync->dest_msg_iter); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/dsync/dsync-brain-msgs.c Mon Jul 27 19:04:36 2009 -0400 @@ -0,0 +1,386 @@ +/* Copyright (c) 2009 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "hash.h" +#include "dsync-worker.h" +#include "dsync-brain-private.h" + +static void dsync_brain_guid_add(struct dsync_brain_msg_iter *iter) +{ + struct dsync_brain_guid_instance *inst, *prev_inst; + + inst = p_new(iter->sync->pool, struct dsync_brain_guid_instance, 1); + inst->mailbox_idx = iter->mailbox_idx; + inst->uid = iter->msg.uid; + + prev_inst = hash_table_lookup(iter->guid_hash, iter->msg.guid); + if (prev_inst == NULL) { + hash_table_insert(iter->guid_hash, + p_strdup(iter->sync->pool, iter->msg.guid), + inst); + } else { + inst->next = prev_inst->next; + prev_inst->next = inst; + } +} + +static int dsync_brain_msg_iter_next(struct dsync_brain_msg_iter *iter) +{ + int ret = 1; + + if (iter->msg.guid == NULL) { + ret = dsync_worker_msg_iter_next(iter->iter, &iter->mailbox_idx, + &iter->msg); + if (ret > 0) + dsync_brain_guid_add(iter); + } + + if (iter->sync->wanted_mailbox_idx != iter->mailbox_idx) { + /* finished with this mailbox */ + return -1; + } + return ret; +} + +static int dsync_brain_msg_iter_next_pair(struct dsync_brain_mailbox_sync *sync) +{ + int ret; + + if ((ret = dsync_brain_msg_iter_next(sync->src_msg_iter)) <= 0) + return ret; + if ((ret = dsync_brain_msg_iter_next(sync->dest_msg_iter)) <= 0) + return ret; + return 1; +} + +static void +dsync_brain_msg_sync_save(struct dsync_brain_msg_iter *iter, + unsigned int mailbox_idx, + const struct dsync_message *msg) +{ + struct dsync_brain_new_msg *new_msg; + + new_msg = array_append_space(&iter->new_msgs); + new_msg->mailbox_idx = mailbox_idx; + new_msg->msg = dsync_message_dup(iter->sync->pool, msg); +} + +static void +dsync_brain_msg_sync_conflict(struct dsync_brain_msg_iter *conflict_iter, + struct dsync_brain_msg_iter *save_iter, + const struct dsync_message *msg) +{ + struct dsync_brain_uid_conflict *conflict; + struct dsync_brain_new_msg *new_msg; + struct dsync_brain_mailbox *brain_box; + uint32_t new_uid; + + brain_box = array_idx_modifiable(&save_iter->sync->mailboxes, + save_iter->mailbox_idx); + new_uid = brain_box->box.uid_next++; + + conflict = array_append_space(&conflict_iter->uid_conflicts); + conflict->mailbox_idx = conflict_iter->mailbox_idx; + conflict->old_uid = msg->uid; + conflict->new_uid = new_uid; + + new_msg = array_append_space(&save_iter->new_msgs); + new_msg->mailbox_idx = save_iter->mailbox_idx; + new_msg->msg = dsync_message_dup(save_iter->sync->pool, msg); + new_msg->msg->uid = new_uid; +} + +static void dsync_brain_msg_sync_existing(struct dsync_brain_mailbox_sync *sync, + struct dsync_message *src_msg, + struct dsync_message *dest_msg) +{ + if (src_msg->modseq > dest_msg->modseq) + dsync_worker_msg_update_metadata(sync->dest_worker, src_msg); + else if (src_msg->modseq < dest_msg->modseq) + dsync_worker_msg_update_metadata(sync->src_worker, dest_msg); + else if (src_msg->flags != dest_msg->flags || + !dsync_keyword_list_equals(src_msg->keywords, + dest_msg->keywords)) { + /* modseqs match, but flags aren't the same. we can't really + know which one we should use, so just pick one. */ + dsync_worker_msg_update_metadata(sync->dest_worker, src_msg); + } +} + +static int dsync_brain_msg_sync_pair(struct dsync_brain_mailbox_sync *sync) +{ + struct dsync_message *src_msg = &sync->src_msg_iter->msg; + struct dsync_message *dest_msg = &sync->dest_msg_iter->msg; + const char *src_guid, *dest_guid; + unsigned char guid_128_data[MAIL_GUID_128_SIZE * 2 + 1]; + bool src_expunged, dest_expunged; + + i_assert(sync->src_msg_iter->mailbox_idx == + sync->dest_msg_iter->mailbox_idx); + + src_expunged = (src_msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0; + dest_expunged = (dest_msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0; + + if (src_expunged) { + src_guid = src_msg->guid; + dest_guid = dsync_get_guid_128_str(dest_msg->guid, + guid_128_data, + sizeof(guid_128_data)); + } else if (dest_expunged) { + src_guid = dsync_get_guid_128_str(src_msg->guid, guid_128_data, + sizeof(guid_128_data)); + dest_guid = dest_msg->guid; + } else { + src_guid = src_msg->guid; + dest_guid = dest_msg->guid; + } + + if (src_msg->uid < dest_msg->uid) { + /* message has been expunged from dest. */ + if (src_expunged) { + /* expunged from source already */ + } else if (sync->uid_conflict) { + /* update uid src, copy to dest */ + dsync_brain_msg_sync_conflict(sync->src_msg_iter, + sync->dest_msg_iter, + src_msg); + } else { + /* expunge from source */ + dsync_worker_msg_expunge(sync->src_worker, + src_msg->uid); + } + src_msg->guid = NULL; + return 0; + } else if (src_msg->uid > dest_msg->uid) { + /* message has been expunged from src. */ + if (dest_expunged) { + /* expunged from dest already */ + } else if (sync->uid_conflict) { + /* update uid in dest, copy to src */ + dsync_brain_msg_sync_conflict(sync->dest_msg_iter, + sync->src_msg_iter, + dest_msg); + } else { + /* expunge from dest */ + dsync_worker_msg_expunge(sync->dest_worker, + dest_msg->uid); + } + dest_msg->guid = NULL; + return 0; + } + + /* UIDs match, but do GUIDs? If either of the GUIDs aren't set, it + means that either the storage doesn't support GUIDs or we're + handling an old-style expunge record. In that case just assume + they match. */ + if (strcmp(src_guid, dest_guid) != 0 && + *src_guid != '\0' && *dest_guid != '\0') { + /* UID conflict. give new UIDs to messages in both src and + dest (if they're not expunged already) */ + sync->uid_conflict = TRUE; + if (!dest_expunged) { + dsync_brain_msg_sync_conflict(sync->dest_msg_iter, + sync->src_msg_iter, + dest_msg); + } + if (!src_expunged) { + dsync_brain_msg_sync_conflict(sync->src_msg_iter, + sync->dest_msg_iter, + src_msg); + } + } else if (dest_expunged) { + /* message expunged from destination */ + if (!src_expunged) { + dsync_worker_msg_expunge(sync->src_worker, + src_msg->uid); + } + } else if (src_expunged) { + /* message expunged from source, expunge from destination too */ + dsync_worker_msg_expunge(sync->dest_worker, dest_msg->uid); + } else { + /* message exists in both source and dest, sync metadata */ + dsync_brain_msg_sync_existing(sync, src_msg, dest_msg); + } + src_msg->guid = NULL; + dest_msg->guid = NULL; + return 0; +} + +static bool dsync_brain_msg_sync_mailbox_end(struct dsync_brain_msg_iter *iter1, + struct dsync_brain_msg_iter *iter2) +{ + const struct dsync_brain_mailbox *brain_box; + int ret; + + brain_box = array_idx(&iter1->sync->mailboxes, + iter1->sync->wanted_mailbox_idx); + while ((ret = dsync_brain_msg_iter_next(iter1)) > 0) { + dsync_brain_msg_sync_save(iter2, iter1->mailbox_idx, + &iter1->msg); + iter1->msg.guid = NULL; + } + return ret < 0; +} + +static bool +dsync_brain_msg_sync_mailbox_more(struct dsync_brain_mailbox_sync *sync) +{ + int ret; + + while ((ret = dsync_brain_msg_iter_next_pair(sync)) > 0) { + if (dsync_brain_msg_sync_pair(sync) < 0) + break; + if (dsync_worker_is_output_full(sync->dest_worker)) + return FALSE; + } + if (ret == 0) + return FALSE; + + /* finished syncing messages in this mailbox that exist in both source + and destination. if there are messages left, we can't reliably know + if they should be expunged, so just copy them to the other side. */ + if (!dsync_brain_msg_sync_mailbox_end(sync->dest_msg_iter, + sync->src_msg_iter)) + return FALSE; + if (!dsync_brain_msg_sync_mailbox_end(sync->src_msg_iter, + sync->dest_msg_iter)) + return FALSE; + + /* done with this mailbox. the same iterator is still used for + getting messages from other mailboxes. */ + return TRUE; +} + +static void dsync_brain_msg_sync_finish(struct dsync_brain_mailbox_sync *sync) +{ + /* synced all existing messages. now add the new messages. */ + if (dsync_worker_msg_iter_deinit(&sync->src_msg_iter->iter) < 0 || + dsync_worker_msg_iter_deinit(&sync->dest_msg_iter->iter) < 0) + dsync_brain_fail(sync->brain); + + dsync_brain_msg_sync_new_msgs(sync); +} + +static void dsync_brain_msg_sync_more(struct dsync_brain_mailbox_sync *sync) +{ + const struct dsync_brain_mailbox *mailboxes; + unsigned int count, mailbox_idx; + + mailboxes = array_get(&sync->mailboxes, &count); + while (dsync_brain_msg_sync_mailbox_more(sync)) { + /* sync the next mailbox */ + sync->uid_conflict = FALSE; + mailbox_idx = ++sync->wanted_mailbox_idx; + if (mailbox_idx == count) { + dsync_brain_msg_sync_finish(sync); + return; + } + dsync_worker_select_mailbox(sync->src_worker, + &mailboxes[mailbox_idx].box.guid); + dsync_worker_select_mailbox(sync->dest_worker, + &mailboxes[mailbox_idx].box.guid); + } +} + +static void dsync_worker_msg_callback(void *context) +{ + struct dsync_brain_mailbox_sync *sync = context; + + dsync_brain_msg_sync_more(sync); +} + +static struct dsync_brain_msg_iter * +dsync_brain_msg_iter_init(struct dsync_brain_mailbox_sync *sync, + struct dsync_worker *worker, + const mailbox_guid_t mailboxes[], + unsigned int mailbox_count) +{ + struct dsync_brain_msg_iter *iter; + + iter = p_new(sync->pool, struct dsync_brain_msg_iter, 1); + iter->sync = sync; + iter->worker = worker; + i_array_init(&iter->uid_conflicts, 128); + i_array_init(&iter->new_msgs, 128); + i_array_init(&iter->copy_retry_indexes, 32); + iter->guid_hash = hash_table_create(default_pool, sync->pool, 10000, + strcase_hash, + (hash_cmp_callback_t *)strcasecmp); + + iter->iter = dsync_worker_msg_iter_init(worker, mailboxes, + mailbox_count); + dsync_worker_set_input_callback(worker, + dsync_worker_msg_callback, sync); + dsync_worker_set_output_callback(worker, + dsync_worker_msg_callback, sync); + dsync_worker_select_mailbox(worker, &mailboxes[0]); + return iter; +} + +static void dsync_brain_msg_iter_deinit(struct dsync_brain_msg_iter *iter) +{ + if (iter->iter != NULL) + (void)dsync_worker_msg_iter_deinit(&iter->iter); + + hash_table_destroy(&iter->guid_hash); + array_free(&iter->uid_conflicts); + array_free(&iter->new_msgs); + array_free(&iter->copy_retry_indexes); +} + +static void +get_mailbox_guids(const ARRAY_TYPE(dsync_brain_mailbox) *mailboxes, + ARRAY_TYPE(mailbox_guid) *guids) +{ + const struct dsync_brain_mailbox *brain_boxes; + unsigned int i, count; + + t_array_init(guids, array_count(mailboxes)); + brain_boxes = array_get(mailboxes, &count); + for (i = 0; i < count; i++) + array_append(guids, &brain_boxes[i].box.guid, 1); +} + +struct dsync_brain_mailbox_sync * +dsync_brain_msg_sync_init(struct dsync_brain *brain, + const ARRAY_TYPE(dsync_brain_mailbox) *mailboxes) +{ + struct dsync_brain_mailbox_sync *sync; + ARRAY_TYPE(mailbox_guid) guids; + unsigned int count; + pool_t pool; + + pool = pool_alloconly_create("dsync brain mailbox sync", 1024*256); + sync = p_new(pool, struct dsync_brain_mailbox_sync, 1); + sync->pool = pool; + sync->brain = brain; + sync->src_worker = brain->src_worker; + sync->dest_worker = brain->dest_worker; + + p_array_init(&sync->mailboxes, pool, array_count(mailboxes)); + array_append_array(&sync->mailboxes, mailboxes); + get_mailbox_guids(mailboxes, &guids); + + /* initialize message iteration on both workers */ + count = array_count(&guids); + sync->src_msg_iter = + dsync_brain_msg_iter_init(sync, brain->src_worker, + array_idx(&guids, 0), count); + sync->dest_msg_iter = + dsync_brain_msg_iter_init(sync, brain->dest_worker, + array_idx(&guids, 0), count); + dsync_brain_msg_sync_more(sync); + return sync; +} + +void dsync_brain_msg_sync_deinit(struct dsync_brain_mailbox_sync **_sync) +{ + struct dsync_brain_mailbox_sync *sync = *_sync; + + *_sync = NULL; + + dsync_brain_msg_iter_deinit(sync->src_msg_iter); + dsync_brain_msg_iter_deinit(sync->dest_msg_iter); + pool_unref(&sync->pool); +}
--- a/src/dsync/dsync-brain-private.h Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-brain-private.h Mon Jul 27 19:04:36 2009 -0400 @@ -7,9 +7,7 @@ enum dsync_state { DSYNC_STATE_GET_MAILBOXES = 0, DSYNC_STATE_CREATE_MAILBOXES, - DSYNC_STATE_SYNC_EXISTING_MSGS, - DSYNC_STATE_SYNC_NEW_MSGS, - DSYNC_STATE_SYNC_RETRY_COPIES, + DSYNC_STATE_SYNC_MSGS, DSYNC_STATE_SYNC_UPDATE_MAILBOX, DSYNC_STATE_SYNC_RESOLVE_UID_CONFLICTS, DSYNC_STATE_SYNC_FLUSH, @@ -21,7 +19,7 @@ struct dsync_brain *brain; struct dsync_worker *worker; struct dsync_worker_mailbox_iter *iter; - ARRAY_DEFINE(mailboxes, struct dsync_mailbox *); + ARRAY_TYPE(dsync_mailbox) mailboxes; }; struct dsync_brain_guid_instance { @@ -36,35 +34,14 @@ struct dsync_brain_mailbox_sync *sync; struct dsync_worker *worker; - unsigned int wanted_mailbox_idx; - struct dsync_worker_msg_iter *iter; struct dsync_message msg; + unsigned int mailbox_idx; - unsigned int save_guids:1; -}; - -struct dsync_brain_uid_conflict { - uint32_t mailbox_idx; - uint32_t uid; -}; - -struct dsync_brain_new_msg { - uint32_t mailbox_idx; - struct dsync_message *msg; -}; - -struct dsync_brain_mailbox_sync { - struct dsync_brain *brain; - pool_t pool; - /* char *guid -> struct dsync_brain_guid_instance* */ struct hash_table *guid_hash; - struct dsync_brain_msg_iter *src_msg_iter; - struct dsync_brain_msg_iter *dest_msg_iter; - ARRAY_DEFINE(uid_conflicts, struct dsync_brain_uid_conflict); ARRAY_DEFINE(new_msgs, struct dsync_brain_new_msg); unsigned int next_new_msg; @@ -72,6 +49,40 @@ /* copy operations that failed. indexes point to new_msgs array */ ARRAY_TYPE(uint32_t) copy_retry_indexes; unsigned int copy_results_left; + unsigned int save_results_left; + + unsigned int msgs_sent:1; +}; + +struct dsync_brain_uid_conflict { + uint32_t mailbox_idx; + uint32_t old_uid, new_uid; +}; + +struct dsync_brain_new_msg { + uint32_t mailbox_idx; + struct dsync_message *msg; +}; + +struct dsync_brain_mailbox { + struct dsync_mailbox box; + struct dsync_mailbox *src; + struct dsync_mailbox *dest; +}; +ARRAY_DEFINE_TYPE(dsync_brain_mailbox, struct dsync_brain_mailbox); + +struct dsync_brain_mailbox_sync { + struct dsync_brain *brain; + pool_t pool; + + ARRAY_TYPE(dsync_brain_mailbox) mailboxes; + unsigned int wanted_mailbox_idx; + + struct dsync_worker *src_worker; + struct dsync_worker *dest_worker; + + struct dsync_brain_msg_iter *src_msg_iter; + struct dsync_brain_msg_iter *dest_msg_iter; unsigned int uid_conflict:1; }; @@ -79,6 +90,7 @@ struct dsync_brain { struct dsync_worker *src_worker; struct dsync_worker *dest_worker; + enum dsync_brain_flags flags; enum dsync_state state; @@ -90,4 +102,14 @@ unsigned int failed:1; }; +void dsync_brain_fail(struct dsync_brain *brain); + +struct dsync_brain_mailbox_sync * +dsync_brain_msg_sync_init(struct dsync_brain *brain, + const ARRAY_TYPE(dsync_brain_mailbox) *mailboxes); +void dsync_brain_msg_sync_deinit(struct dsync_brain_mailbox_sync **_sync); + +void dsync_brain_msg_sync_new_msgs(struct dsync_brain_mailbox_sync *sync); +void dsync_brain_msg_sync_resolve_uid_conflicts(struct dsync_brain_mailbox_sync *sync); + #endif
--- a/src/dsync/dsync-brain.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-brain.c Mon Jul 27 19:04:36 2009 -0400 @@ -3,27 +3,27 @@ #include "lib.h" #include "array.h" #include "hash.h" -#include "hex-binary.h" #include "master-service.h" #include "dsync-worker.h" #include "dsync-brain-private.h" static void dsync_brain_mailbox_list_deinit(struct dsync_brain_mailbox_list **list); -static void dsync_brain_msg_sync_deinit(struct dsync_brain_mailbox_sync **sync); struct dsync_brain *dsync_brain_init(struct dsync_worker *src_worker, - struct dsync_worker *dest_worker) + struct dsync_worker *dest_worker, + enum dsync_brain_flags flags) { struct dsync_brain *brain; brain = i_new(struct dsync_brain, 1); brain->src_worker = src_worker; brain->dest_worker = dest_worker; + brain->flags = flags; return brain; } -static void dsync_brain_fail(struct dsync_brain *brain) +void dsync_brain_fail(struct dsync_brain *brain) { brain->failed = TRUE; master_service_stop(master_service); @@ -118,7 +118,7 @@ memset(&new_box, 0, sizeof(new_box)); - /* find mailboxes from source whose GUIDs don't exist in dest. + /* find mailboxes from whose GUIDs don't exist. the mailboxes are sorted by GUID, so we can do this quickly. */ src_boxes = array_get(&brain->src_mailbox_list->mailboxes, &src_count); dest_boxes = array_get(&brain->dest_mailbox_list->mailboxes, &dest_count); @@ -136,6 +136,11 @@ src++; } else { /* exists only in dest */ + new_box = *dest_boxes[dest]; + new_box.uid_next = 0; + new_box.highest_modseq = 0; + dsync_worker_create_mailbox(brain->src_worker, + &new_box); dest++; } } @@ -145,271 +150,12 @@ new_box.highest_modseq = 0; dsync_worker_create_mailbox(brain->dest_worker, &new_box); } -} - -static void dsync_brain_guid_add(struct dsync_brain_mailbox_sync *sync, - struct dsync_brain_msg_iter *iter) -{ - struct dsync_brain_guid_instance *inst, *prev_inst; - - inst = p_new(sync->pool, struct dsync_brain_guid_instance, 1); - inst->mailbox_idx = iter->mailbox_idx; - inst->uid = iter->msg.uid; - - prev_inst = hash_table_lookup(sync->guid_hash, iter->msg.guid); - if (prev_inst == NULL) { - hash_table_insert(sync->guid_hash, - p_strdup(sync->pool, iter->msg.guid), inst); - } else { - inst->next = prev_inst->next; - prev_inst->next = inst; - } -} - -static int dsync_brain_msg_iter_next(struct dsync_brain_msg_iter *iter) -{ - int ret = 1; - - if (iter->msg.guid == NULL) { - ret = dsync_worker_msg_iter_next(iter->iter, &iter->mailbox_idx, - &iter->msg); - if (ret > 0) { - if (iter->save_guids) - dsync_brain_guid_add(iter->sync, iter); - } - } - - if (iter->wanted_mailbox_idx != iter->mailbox_idx) { - /* finished with this mailbox */ - return -1; - } - return ret; -} - -static int dsync_brain_msg_iter_next_pair(struct dsync_brain_mailbox_sync *sync) -{ - int ret; - - if ((ret = dsync_brain_msg_iter_next(sync->src_msg_iter)) <= 0) - return ret; - if ((ret = dsync_brain_msg_iter_next(sync->dest_msg_iter)) <= 0) - return ret; - return 1; -} - -static void -dsync_brain_msg_sync_save_source(struct dsync_brain_mailbox_sync *sync) -{ - struct dsync_brain_new_msg *new_msg; - - new_msg = array_append_space(&sync->new_msgs); - new_msg->mailbox_idx = sync->src_msg_iter->mailbox_idx; - new_msg->msg = dsync_message_dup(sync->pool, &sync->src_msg_iter->msg); -} - -static void dsync_brain_msg_sync_existing(struct dsync_brain *brain, - struct dsync_message *src_msg, - struct dsync_message *dest_msg) -{ - if (src_msg->flags != dest_msg->flags || - src_msg->modseq > dest_msg->modseq || - !dsync_keyword_list_equals(src_msg->keywords, dest_msg->keywords)) - dsync_worker_msg_update_metadata(brain->dest_worker, src_msg); -} - -static const char * -get_guid_128_str(const char *guid, unsigned char *dest, unsigned int dest_len) -{ - uint8_t guid_128[MAIL_GUID_128_SIZE]; - buffer_t guid_128_buf; - - buffer_create_data(&guid_128_buf, dest, dest_len); - mail_generate_guid_128_hash(guid, guid_128); - if (mail_guid_128_is_empty(guid_128)) - return ""; - binary_to_hex_append(&guid_128_buf, guid_128, sizeof(guid_128)); - buffer_append_c(&guid_128_buf, '\0'); - return guid_128_buf.data; -} - -static int dsync_brain_msg_sync_pair(struct dsync_brain_mailbox_sync *sync) -{ - struct dsync_message *src_msg = &sync->src_msg_iter->msg; - struct dsync_message *dest_msg = &sync->dest_msg_iter->msg; - struct dsync_mailbox *const *boxp; - struct dsync_brain_uid_conflict *conflict; - const char *src_guid, *dest_guid; - unsigned char guid_128_data[MAIL_GUID_128_SIZE * 2 + 1]; - bool src_expunged, dest_expunged; - - src_expunged = (src_msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0; - dest_expunged = (dest_msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0; - - if (src_expunged) { - src_guid = src_msg->guid; - dest_guid = get_guid_128_str(dest_msg->guid, guid_128_data, - sizeof(guid_128_data)); - } else if (dest_expunged) { - src_guid = get_guid_128_str(src_msg->guid, guid_128_data, - sizeof(guid_128_data)); - dest_guid = dest_msg->guid; - } else { - src_guid = src_msg->guid; - dest_guid = dest_msg->guid; - } - - if (src_msg->uid < dest_msg->uid) { - /* message has been expunged from dest. ignore it, unless - we're in uid-conflict mode. */ - if (sync->uid_conflict && !src_expunged) - dsync_brain_msg_sync_save_source(sync); - src_msg->guid = NULL; - return 0; - } else if (src_msg->uid > dest_msg->uid) { - /* message has been expunged from src. expunge it from dest - too, unless we're in uid-conflict mode. */ - if (!sync->uid_conflict && !dest_expunged) { - dsync_worker_msg_expunge(sync->brain->dest_worker, - dest_msg->uid); - } - dest_msg->guid = NULL; - return 0; + for (; dest < dest_count; dest++) { + new_box = *dest_boxes[dest]; + new_box.uid_next = 0; + new_box.highest_modseq = 0; + dsync_worker_create_mailbox(brain->src_worker, &new_box); } - - /* UIDs match, but do GUIDs? If either of the GUIDs aren't set, it - means that either the storage doesn't support GUIDs or we're - handling an old-style expunge record. In that case just assume - they match. */ - if (strcmp(src_guid, dest_guid) != 0 && - *src_guid != '\0' && *dest_guid != '\0') { - /* UID conflict. give new UIDs to messages in both src and - dest (if they're not expunged already) */ - sync->uid_conflict = TRUE; - if (!dest_expunged) { - conflict = array_append_space(&sync->uid_conflicts); - conflict->mailbox_idx = sync->src_msg_iter->mailbox_idx; - conflict->uid = dest_msg->uid; - } - if (!src_expunged) { - boxp = array_idx(&sync->brain->src_mailbox_list->mailboxes, - conflict->mailbox_idx); - src_msg->uid = (*boxp)->uid_next++; - dsync_brain_msg_sync_save_source(sync); - } - } else if (dest_expunged) { - /* message expunged from destination, we can skip this. */ - } else if (src_expunged) { - /* message expunged from source, expunge from destination too */ - dsync_worker_msg_expunge(sync->brain->dest_worker, - dest_msg->uid); - } else { - /* message exists in both source and dest, sync metadata */ - dsync_brain_msg_sync_existing(sync->brain, src_msg, dest_msg); - } - src_msg->guid = NULL; - dest_msg->guid = NULL; - return 0; -} - -static bool -dsync_brain_msg_sync_mailbox_more(struct dsync_brain_mailbox_sync *sync) -{ - struct dsync_mailbox *const *boxp; - int ret; - - while ((ret = dsync_brain_msg_iter_next_pair(sync)) > 0) { - if (dsync_brain_msg_sync_pair(sync) < 0) - break; - if (dsync_worker_is_output_full(sync->brain->dest_worker)) - return FALSE; - } - if (ret == 0) - return FALSE; - - /* finished syncing messages in this mailbox that exist in both source - and destination. if there are messages left in destination, - we can't reliably know if they should be expunged, so don't. - Add their GUIDs to hash in any case. */ - - boxp = array_idx(&sync->brain->src_mailbox_list->mailboxes, - sync->src_msg_iter->wanted_mailbox_idx); - while ((ret = dsync_brain_msg_iter_next(sync->dest_msg_iter)) > 0) { - if (sync->dest_msg_iter->msg.uid >= (*boxp)->uid_next) - sync->uid_conflict = TRUE; - sync->dest_msg_iter->msg.guid = NULL; - } - if (ret == 0) - return FALSE; - - /* if there are any messages left in source, we'll copy all of them */ - while ((ret = dsync_brain_msg_iter_next(sync->src_msg_iter)) > 0) { - dsync_brain_msg_sync_save_source(sync); - sync->src_msg_iter->msg.guid = NULL; - } - if (ret == 0) - return FALSE; - /* done with this mailbox. the same iterator is still used for - getting messages from other mailboxes. */ - return TRUE; -} - -static void dsync_brain_msg_sync_finish(struct dsync_brain_mailbox_sync *sync) -{ - /* synced all existing messages. now add the new messages. */ - if (dsync_worker_msg_iter_deinit(&sync->src_msg_iter->iter) < 0 || - dsync_worker_msg_iter_deinit(&sync->dest_msg_iter->iter)) - dsync_brain_fail(sync->brain); - - sync->brain->state++; - dsync_brain_sync(sync->brain); -} - -static void dsync_brain_msg_sync_more(struct dsync_brain_mailbox_sync *sync) -{ - struct dsync_mailbox *const *mailboxes; - unsigned int count, mailbox_idx; - - mailboxes = array_get(&sync->brain->src_mailbox_list->mailboxes, - &count); - while (dsync_brain_msg_sync_mailbox_more(sync)) { - /* sync the next mailbox */ - sync->uid_conflict = FALSE; - mailbox_idx = ++sync->src_msg_iter->wanted_mailbox_idx; - sync->dest_msg_iter->wanted_mailbox_idx++; - if (mailbox_idx == count) { - dsync_brain_msg_sync_finish(sync); - return; - } - dsync_worker_select_mailbox(sync->brain->dest_worker, - &mailboxes[mailbox_idx]->guid); - } -} - -static void dsync_worker_msg_callback(void *context) -{ - struct dsync_brain_mailbox_sync *sync = context; - - dsync_brain_msg_sync_more(sync); -} - -static struct dsync_brain_msg_iter * -dsync_brain_msg_iter_init(struct dsync_brain_mailbox_sync *sync, - struct dsync_worker *worker, - const mailbox_guid_t mailboxes[], - unsigned int mailbox_count) -{ - struct dsync_brain_msg_iter *iter; - - iter = p_new(sync->pool, struct dsync_brain_msg_iter, 1); - iter->sync = sync; - iter->worker = worker; - iter->iter = dsync_worker_msg_iter_init(worker, mailboxes, - mailbox_count); - dsync_worker_set_input_callback(worker, - dsync_worker_msg_callback, sync); - dsync_worker_set_output_callback(worker, - dsync_worker_msg_callback, sync); - return iter; } static bool dsync_mailbox_has_changed_msgs(const struct dsync_mailbox *box1, @@ -422,9 +168,11 @@ static void dsync_brain_get_changed_mailboxes(struct dsync_brain *brain, - ARRAY_TYPE(mailbox_guid) *guids) + ARRAY_TYPE(dsync_brain_mailbox) *brain_boxes, + bool full_sync) { struct dsync_mailbox *const *src_boxes, *const *dest_boxes; + struct dsync_brain_mailbox *brain_box; unsigned int src, dest, src_count, dest_count; int ret; @@ -434,297 +182,70 @@ for (src = dest = 0; src < src_count && dest < dest_count; ) { ret = dsync_mailbox_guid_cmp(src_boxes[src], dest_boxes[dest]); if (ret == 0) { - if (dsync_mailbox_has_changed_msgs(src_boxes[src], - dest_boxes[dest])) - array_append(guids, &src_boxes[src]->guid, 1); + if (full_sync || + dsync_mailbox_has_changed_msgs(src_boxes[src], + dest_boxes[dest])) { + brain_box = array_append_space(brain_boxes); + brain_box->box = *src_boxes[src]; + + brain_box->box.highest_modseq = + I_MAX(src_boxes[src]->highest_modseq, + dest_boxes[dest]->highest_modseq); + brain_box->box.uid_next = + I_MAX(src_boxes[src]->uid_next, + dest_boxes[dest]->uid_next); + brain_box->src = src_boxes[src]; + brain_box->dest = dest_boxes[dest]; + } src++; dest++; } else if (ret < 0) { /* exists only in source */ - array_append(guids, &src_boxes[src]->guid, 1); + brain_box = array_append_space(brain_boxes); + brain_box->box = *src_boxes[src]; + brain_box->src = src_boxes[src]; src++; } else { /* exists only in dest */ + brain_box = array_append_space(brain_boxes); + brain_box->box = *dest_boxes[dest]; + brain_box->dest = dest_boxes[dest]; dest++; } } - for (; src < src_count; src++) - array_append(guids, &src_boxes[src]->guid, 1); -} - -static struct dsync_brain_mailbox_sync * -dsync_brain_msg_sync_init(struct dsync_brain *brain) -{ - struct dsync_brain_mailbox_sync *sync; - ARRAY_TYPE(mailbox_guid) guids; - unsigned int count; - pool_t pool; - - t_array_init(&guids, array_count(&brain->src_mailbox_list->mailboxes)); - dsync_brain_get_changed_mailboxes(brain, &guids); - - pool = pool_alloconly_create("dsync brain mailbox sync", 1024*256); - sync = p_new(pool, struct dsync_brain_mailbox_sync, 1); - sync->pool = pool; - sync->brain = brain; - - i_array_init(&sync->uid_conflicts, 128); - i_array_init(&sync->new_msgs, 128); - i_array_init(&sync->copy_retry_indexes, 32); - - /* initialize message iteration on both workers */ - count = array_count(&guids); - sync->src_msg_iter = - dsync_brain_msg_iter_init(sync, brain->src_worker, - array_idx(&guids, 0), count); - sync->dest_msg_iter = - dsync_brain_msg_iter_init(sync, brain->dest_worker, - array_idx(&guids, 0), count); - - sync->guid_hash = hash_table_create(default_pool, pool, 10000, - strcase_hash, - (hash_cmp_callback_t *)strcasecmp); - sync->dest_msg_iter->save_guids = TRUE; - return sync; -} - -static void dsync_brain_msg_sync_deinit(struct dsync_brain_mailbox_sync **_sync) -{ - struct dsync_brain_mailbox_sync *sync = *_sync; - - *_sync = NULL; - - if (sync->src_msg_iter->iter != NULL) - (void)dsync_worker_msg_iter_deinit(&sync->src_msg_iter->iter); - if (sync->dest_msg_iter->iter != NULL) - (void)dsync_worker_msg_iter_deinit(&sync->dest_msg_iter->iter); - - hash_table_destroy(&sync->guid_hash); - array_free(&sync->uid_conflicts); - array_free(&sync->new_msgs); - array_free(&sync->copy_retry_indexes); - pool_unref(&sync->pool); -} - -static void dsync_brain_sync_existing_mailboxes(struct dsync_brain *brain) -{ - brain->mailbox_sync = dsync_brain_msg_sync_init(brain); - dsync_brain_msg_sync_more(brain->mailbox_sync); -} - -static int -dsync_brain_msg_sync_add_new_msg(struct dsync_brain_mailbox_sync *sync, - const struct dsync_mailbox *src_mailbox, - unsigned int msg_idx, - const struct dsync_message *msg) -{ - const struct dsync_brain_guid_instance *inst; - struct dsync_mailbox *const *inst_box; - struct dsync_msg_static_data data; - int ret; - - inst = hash_table_lookup(sync->guid_hash, msg->guid); - if (inst != NULL) { - /* we can save this by copying an existing message */ - dsync_worker_select_mailbox(sync->brain->dest_worker, - &src_mailbox->guid); - dsync_worker_set_next_result_tag(sync->brain->dest_worker, - msg_idx+1); - inst_box = array_idx(&sync->brain->src_mailbox_list->mailboxes, - inst->mailbox_idx); - dsync_worker_msg_copy(sync->brain->dest_worker, - &(*inst_box)->guid, inst->uid, msg); - sync->copy_results_left++; - } else { - dsync_worker_select_mailbox(sync->brain->src_worker, - &src_mailbox->guid); - ret = dsync_worker_msg_get(sync->brain->src_worker, - msg->uid, &data); - if (ret <= 0) { - if (ret == 0) { - /* mail got expunged during sync. - just skip this. */ - return 1; - } else { - dsync_brain_fail(sync->brain); - return -1; - } - } - dsync_worker_select_mailbox(sync->brain->dest_worker, - &src_mailbox->guid); - dsync_worker_msg_save(sync->brain->dest_worker, msg, &data); + for (; src < src_count; src++) { + brain_box = array_append_space(brain_boxes); + brain_box->box = *src_boxes[src]; + brain_box->src = src_boxes[src]; } - return dsync_worker_is_output_full(sync->brain->dest_worker) ? 0 : 1; -} - -static void -dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_mailbox_sync *sync) -{ - struct dsync_mailbox *const *mailboxes, *mailbox; - const struct dsync_brain_new_msg *msgs; - unsigned int i, mailbox_count, msg_count; - - mailboxes = array_get(&sync->brain->src_mailbox_list->mailboxes, - &mailbox_count); - msgs = array_get(&sync->new_msgs, &msg_count); - for (i = sync->next_new_msg; i < msg_count; i++) { - mailbox = mailboxes[msgs[i].mailbox_idx]; - if (dsync_brain_msg_sync_add_new_msg(sync, mailbox, i, - msgs[i].msg) <= 0) { - /* failed / continue later */ - sync->next_new_msg = i + 1; - return; - } - } - - /* all messages sent */ - if (sync->copy_results_left == 0) { - sync->brain->state++; - dsync_brain_sync(sync->brain); + for (; dest < dest_count; dest++) { + brain_box = array_append_space(brain_boxes); + brain_box->box = *dest_boxes[dest]; + brain_box->dest = dest_boxes[dest]; } } -static void dsync_worker_copy_input(void *context) +static void dsync_brain_sync_msgs(struct dsync_brain *brain) { - struct dsync_brain_mailbox_sync *sync = context; - struct dsync_brain_guid_instance *inst; - const struct dsync_brain_new_msg *msgs; - unsigned int count; - uint32_t tag; - int result; - - msgs = array_get(&sync->new_msgs, &count); - while (dsync_worker_get_next_result(sync->brain->dest_worker, - &tag, &result)) { - if (tag == 0 || tag > count) { - i_error("Worker sent result with invalid tag %u", tag); - dsync_brain_fail(sync->brain); - return; - } - tag--; - if (sync->copy_results_left == 0) { - i_error("Worker sent unexpected result"); - dsync_brain_fail(sync->brain); - return; - } - sync->copy_results_left--; - if (result < 0) { - /* mark the guid instance invalid and try again later */ - inst = hash_table_lookup(sync->guid_hash, - msgs[tag].msg->guid); - inst->failed = TRUE; - array_append(&sync->copy_retry_indexes, &tag, 1); - } - } - if (sync->copy_results_left == 0) { - sync->brain->state++; - dsync_brain_sync(sync->brain); - } -} - -static void dsync_worker_new_msg_output(void *context) -{ - struct dsync_brain_mailbox_sync *sync = context; - - dsync_brain_msg_sync_add_new_msgs(sync); -} - -static void -dsync_brain_msg_sync_new_msgs(struct dsync_brain_mailbox_sync *sync) -{ - dsync_worker_set_input_callback(sync->brain->dest_worker, - dsync_worker_copy_input, sync); - dsync_worker_set_output_callback(sync->brain->dest_worker, - dsync_worker_new_msg_output, sync); - dsync_brain_msg_sync_add_new_msgs(sync); -} + ARRAY_TYPE(dsync_brain_mailbox) mailboxes; -static void -dsync_brain_msg_sync_retry_copies(struct dsync_brain_mailbox_sync *sync) -{ - const uint32_t *indexes; - struct dsync_mailbox *const *mailboxes, *mailbox; - const struct dsync_brain_new_msg *msgs; - unsigned int i, msg_idx, idx_count, msg_count, mailbox_count; - struct dsync_brain_guid_instance *inst; - const char *guid_str; - void *orig_key, *orig_value; - - /* first remove GUID instances that had failed. */ - msgs = array_get(&sync->new_msgs, &msg_count); - indexes = array_get(&sync->copy_retry_indexes, &idx_count); - for (i = 0; i < idx_count; i++) { - guid_str = msgs[indexes[i]].msg->guid; - if (hash_table_lookup_full(sync->guid_hash, guid_str, - &orig_key, &orig_value)) - inst = orig_value; - else - inst = NULL; - if (inst != NULL && inst->failed) { - inst = inst->next; - if (inst == NULL) - hash_table_remove(sync->guid_hash, guid_str); - else { - hash_table_update(sync->guid_hash, orig_key, - inst); - } - } - } - - /* try saving again. there probably weren't many of them, so don't - worry about filling output buffer. */ - mailboxes = array_get(&sync->brain->src_mailbox_list->mailboxes, - &mailbox_count); - for (i = 0; i < idx_count; i++) { - msg_idx = indexes[i]; - mailbox = mailboxes[msgs[msg_idx].mailbox_idx]; - (void)dsync_brain_msg_sync_add_new_msg(sync, mailbox, msg_idx, - msgs[msg_idx].msg); - } - - /* if we copied anything, we'll again have to wait for the results */ - array_clear(&sync->copy_retry_indexes); - dsync_worker_set_output_callback(sync->brain->dest_worker, NULL, NULL); - - if (sync->copy_results_left == 0) { - dsync_worker_set_input_callback(sync->brain->dest_worker, - NULL, NULL); - sync->brain->state++; - dsync_brain_sync(sync->brain); - } else { - /* temporarily move back the state. once copies have returned - success/failures, we'll get back to this function and see - if we need to retry again */ - sync->brain->state--; - } + t_array_init(&mailboxes, 128); + dsync_brain_get_changed_mailboxes(brain, &mailboxes, + (brain->flags & DSYNC_BRAIN_FLAG_FULL_SYNC) != 0); + brain->mailbox_sync = dsync_brain_msg_sync_init(brain, &mailboxes); } static void dsync_brain_msg_sync_update_mailbox(struct dsync_brain *brain) { - struct dsync_mailbox *const *mailboxes; + const struct dsync_brain_mailbox *mailboxes; unsigned int i, count; - mailboxes = array_get(&brain->src_mailbox_list->mailboxes, &count); - for (i = 0; i < count; i++) - dsync_worker_update_mailbox(brain->dest_worker, mailboxes[i]); -} - -static void -dsync_brain_msg_sync_resolve_uid_conflicts(struct dsync_brain_mailbox_sync *sync) -{ - const struct dsync_brain_uid_conflict *conflicts; - struct dsync_mailbox *const *mailboxes, *mailbox; - unsigned int i, count, mailbox_count; - - mailboxes = array_get(&sync->brain->src_mailbox_list->mailboxes, - &mailbox_count); - conflicts = array_get(&sync->uid_conflicts, &count); + mailboxes = array_get(&brain->mailbox_sync->mailboxes, &count); for (i = 0; i < count; i++) { - mailbox = mailboxes[conflicts[i].mailbox_idx]; - dsync_worker_select_mailbox(sync->brain->dest_worker, - &mailbox->guid); - dsync_worker_msg_update_uid(sync->brain->dest_worker, - conflicts[i].uid); + dsync_worker_update_mailbox(brain->src_worker, + &mailboxes[i].box); + dsync_worker_update_mailbox(brain->dest_worker, + &mailboxes[i].box); } } @@ -755,9 +276,10 @@ dsync_worker_mailbox_input(brain->dest_mailbox_list); break; case DSYNC_STATE_CREATE_MAILBOXES: - if (array_count(&brain->src_mailbox_list->mailboxes) == 0) { + if (array_count(&brain->src_mailbox_list->mailboxes) == 0 && + array_count(&brain->dest_mailbox_list->mailboxes) == 0) { /* no mailboxes */ - i_error("No source mailboxes"); + i_error("No mailboxes"); dsync_brain_fail(brain); } @@ -766,14 +288,8 @@ dsync_brain_create_missing_mailboxes(brain); brain->state++; /* fall through */ - case DSYNC_STATE_SYNC_EXISTING_MSGS: - dsync_brain_sync_existing_mailboxes(brain); - break; - case DSYNC_STATE_SYNC_NEW_MSGS: - dsync_brain_msg_sync_new_msgs(brain->mailbox_sync); - break; - case DSYNC_STATE_SYNC_RETRY_COPIES: - dsync_brain_msg_sync_retry_copies(brain->mailbox_sync); + case DSYNC_STATE_SYNC_MSGS: + dsync_brain_sync_msgs(brain); break; case DSYNC_STATE_SYNC_UPDATE_MAILBOX: dsync_brain_msg_sync_update_mailbox(brain); @@ -786,6 +302,7 @@ brain->state++; /* fall through */ case DSYNC_STATE_SYNC_FLUSH: + /* FIXME: retrieve worker failures and set brain failure */ dsync_worker_set_output_callback(brain->dest_worker, dsync_worker_flush_callback, brain);
--- a/src/dsync/dsync-brain.h Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-brain.h Mon Jul 27 19:04:36 2009 -0400 @@ -1,10 +1,15 @@ #ifndef DSYNC_BRAIN_H #define DSYNC_BRAIN_H +enum dsync_brain_flags { + DSYNC_BRAIN_FLAG_FULL_SYNC = 0x01 +}; + struct dsync_worker; struct dsync_brain *dsync_brain_init(struct dsync_worker *src_worker, - struct dsync_worker *dest_worker); + struct dsync_worker *dest_worker, + enum dsync_brain_flags flags); int dsync_brain_deinit(struct dsync_brain **brain); void dsync_brain_sync(struct dsync_brain *brain);
--- a/src/dsync/dsync-data.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-data.c Mon Jul 27 19:04:36 2009 -0400 @@ -1,6 +1,8 @@ /* Copyright (c) 2009 Dovecot authors, see the included COPYING file */ #include "lib.h" +#include "buffer.h" +#include "hex-binary.h" #include "dsync-data.h" struct dsync_mailbox * @@ -77,3 +79,30 @@ return FALSE; } } + +bool dsync_guid_equals(const mailbox_guid_t *guid1, + const mailbox_guid_t *guid2) +{ + return memcmp(guid1->guid, guid2->guid, sizeof(guid1->guid)) == 0; +} + +const char *dsync_guid_to_str(const mailbox_guid_t *guid) +{ + return binary_to_hex(guid->guid, sizeof(guid->guid)); +} + +const char *dsync_get_guid_128_str(const char *guid, unsigned char *dest, + unsigned int dest_len) +{ + uint8_t guid_128[MAIL_GUID_128_SIZE]; + buffer_t guid_128_buf; + + i_assert(dest_len >= MAIL_GUID_128_SIZE * 2 + 1); + buffer_create_data(&guid_128_buf, dest, dest_len); + mail_generate_guid_128_hash(guid, guid_128); + if (mail_guid_128_is_empty(guid_128)) + return ""; + binary_to_hex_append(&guid_128_buf, guid_128, sizeof(guid_128)); + buffer_append_c(&guid_128_buf, '\0'); + return guid_128_buf.data; +}
--- a/src/dsync/dsync-data.h Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-data.h Mon Jul 27 19:04:36 2009 -0400 @@ -15,6 +15,7 @@ uint32_t uid_validity, uid_next; uint64_t highest_modseq; }; +ARRAY_DEFINE_TYPE(dsync_mailbox, struct dsync_mailbox *); /* dsync_worker_msg_iter_next() returns also all expunged messages from the end of mailbox with this flag set. The GUIDs are 128 bit GUIDs saved @@ -31,6 +32,12 @@ time_t save_date; }; +struct dsync_msg_static_data { + const char *pop3_uidl; + time_t received_date; + struct istream *input; +}; + struct dsync_mailbox * dsync_mailbox_dup(pool_t pool, const struct dsync_mailbox *box); @@ -44,4 +51,10 @@ bool dsync_keyword_list_equals(const char *const *k1, const char *const *k2); +bool dsync_guid_equals(const mailbox_guid_t *guid1, + const mailbox_guid_t *guid2); +const char *dsync_guid_to_str(const mailbox_guid_t *guid); +const char *dsync_get_guid_128_str(const char *guid, unsigned char *dest, + unsigned int dest_len); + #endif
--- a/src/dsync/dsync-proxy-client.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-proxy-client.c Mon Jul 27 19:04:36 2009 -0400 @@ -1,8 +1,11 @@ /* Copyright (c) 2009 Dovecot authors, see the included COPYING file */ #include "lib.h" +#include "array.h" +#include "aqueue.h" #include "fd-set-nonblock.h" #include "istream.h" +#include "istream-dot.h" #include "ostream.h" #include "str.h" #include "strescape.h" @@ -15,6 +18,20 @@ #define OUTBUF_THROTTLE_SIZE (1024*64) +enum proxy_client_request_type { + PROXY_CLIENT_REQUEST_TYPE_COPY, + PROXY_CLIENT_REQUEST_TYPE_GET +}; + +struct proxy_client_request { + enum proxy_client_request_type type; + union { + dsync_worker_msg_callback_t *get; + dsync_worker_copy_callback_t *copy; + } callback; + void *context; +}; + struct proxy_client_dsync_worker_mailbox_iter { struct dsync_worker_mailbox_iter iter; pool_t pool; @@ -28,18 +45,140 @@ struct ostream *output; mailbox_guid_t selected_box_guid; + struct istream *save_input; - unsigned int save_input_last_lf:1; + struct io *save_io; + bool save_input_last_lf; + + pool_t msg_get_pool; + struct dsync_msg_static_data msg_get_data; + ARRAY_DEFINE(request_array, struct proxy_client_request); + struct aqueue *request_queue; }; extern struct dsync_worker_vfuncs proxy_client_dsync_worker; +static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker); static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker); +static int +proxy_client_worker_read_line(struct proxy_client_dsync_worker *worker, + const char **line_r) +{ + if (worker->worker.failed) + return -1; + + *line_r = i_stream_read_next_line(worker->input); + if (*line_r == NULL) { + if (worker->input->stream_errno != 0) { + errno = worker->input->stream_errno; + i_error("read() from worker server failed: %m"); + dsync_worker_set_failure(&worker->worker); + return -1; + } + if (worker->input->eof) { + i_error("worker server disconnected unexpectedly"); + dsync_worker_set_failure(&worker->worker); + return -1; + } + } + return *line_r != NULL ? 1 : 0; +} + +static void +proxy_client_worker_msg_get_done(struct proxy_client_dsync_worker *worker) +{ + worker->msg_get_data.input = NULL; + worker->io = io_add(worker->fd_in, IO_READ, + proxy_client_worker_input, worker); +} + +static bool +proxy_client_worker_next_copy(const struct proxy_client_request *request, + const char *line) +{ + request->callback.copy(*line == '1', request->context); + return TRUE; +} + +static bool +proxy_client_worker_next_msg_get(struct proxy_client_dsync_worker *worker, + const struct proxy_client_request *request, + const char *line) +{ + enum dsync_msg_get_result result; + const char *error; + + i_assert(worker->msg_get_data.input == NULL); + p_clear(worker->msg_get_pool); + switch (line[0]) { + case '1': + /* ok */ + if (dsync_proxy_msg_static_import(worker->msg_get_pool, + line, &worker->msg_get_data, + &error) < 0) { + i_error("Invalid msg-get static input: %s", error); + i_stream_close(worker->input); + return FALSE; + } + worker->msg_get_data.input = + i_stream_create_dot(worker->input, FALSE); + i_stream_set_destroy_callback(worker->msg_get_data.input, + proxy_client_worker_msg_get_done, + worker); + result = DSYNC_MSG_GET_RESULT_SUCCESS; + break; + case '0': + /* expunged */ + result = DSYNC_MSG_GET_RESULT_EXPUNGED; + break; + default: + /* failure */ + result = DSYNC_MSG_GET_RESULT_FAILED; + break; + } + + io_remove(&worker->io); + request->callback.get(result, &worker->msg_get_data, request->context); + return worker->io != NULL; +} + +static bool +proxy_client_worker_next_reply(struct proxy_client_dsync_worker *worker, + const char *line) +{ + const struct proxy_client_request *requests; + struct proxy_client_request request; + bool ret = TRUE; + + requests = array_idx(&worker->request_array, 0); + request = requests[aqueue_idx(worker->request_queue, 0)]; + aqueue_delete_tail(worker->request_queue); + + switch (request.type) { + case PROXY_CLIENT_REQUEST_TYPE_COPY: + ret = proxy_client_worker_next_copy(&request, line); + break; + case PROXY_CLIENT_REQUEST_TYPE_GET: + ret = proxy_client_worker_next_msg_get(worker, &request, line); + break; + } + return ret; +} + static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker) { - i_assert(worker->worker.input_callback != NULL); - worker->worker.input_callback(worker->worker.input_context); + const char *line; + + if (worker->worker.input_callback != NULL) { + worker->worker.input_callback(worker->worker.input_context); + return; + } + + while (proxy_client_worker_read_line(worker, &line) > 0) { + if (!proxy_client_worker_next_reply(worker, line)) + break; + } } static int proxy_client_worker_output(struct proxy_client_dsync_worker *worker) @@ -79,6 +218,11 @@ worker); fd_set_nonblock(fd_in, TRUE); fd_set_nonblock(fd_out, TRUE); + + worker->msg_get_pool = pool_alloconly_create("dsync proxy msg", 128); + i_array_init(&worker->request_array, 64); + worker->request_queue = aqueue_init(&worker->request_array.arr); + return &worker->worker; } @@ -96,64 +240,12 @@ if (close(worker->fd_out) < 0) i_error("close(worker output) failed: %m"); } + aqueue_deinit(&worker->request_queue); + array_free(&worker->request_array); + pool_unref(&worker->msg_get_pool); i_free(worker); } -static int -proxy_client_worker_read_line(struct proxy_client_dsync_worker *worker, - const char **line_r) -{ - *line_r = i_stream_read_next_line(worker->input); - if (*line_r == NULL) { - if (worker->input->stream_errno != 0) { - errno = worker->input->stream_errno; - i_error("read() from worker server failed: %m"); - return -1; - } - if (worker->input->eof) { - i_error("worker server disconnected unexpectedly"); - return -1; - } - } - return *line_r != NULL ? 1 : 0; -} - -static uint32_t -proxy_client_worker_next_tag(struct proxy_client_dsync_worker *worker) -{ - uint32_t ret; - - ret = worker->worker.next_tag; - worker->worker.next_tag = 0; - return ret; -} - -static bool proxy_client_worker_get_next_result(struct dsync_worker *_worker, - uint32_t *tag_r, int *result_r) -{ - struct proxy_client_dsync_worker *worker = - (struct proxy_client_dsync_worker *)_worker; - const char *line; - bool ret = TRUE; - - if (proxy_client_worker_read_line(worker, &line) <= 0) - return FALSE; - - T_BEGIN { - const char *const *args; - - args = t_strsplit(line, "\t"); - *tag_r = strtoul(args[0], NULL, 10); - *result_r = strtol(args[1], NULL, 10); - - if (args[0] == NULL || args[1] == NULL || *tag_r == 0) { - i_error("Invalid input from worker server: %s", line); - ret = FALSE; - } - } T_END; - return ret; -} - static bool proxy_client_worker_is_output_full(struct dsync_worker *_worker) { struct proxy_client_dsync_worker *worker = @@ -188,9 +280,7 @@ iter = i_new(struct proxy_client_dsync_worker_mailbox_iter, 1); iter->iter.worker = _worker; iter->pool = pool_alloconly_create("proxy mailbox iter", 1024); - o_stream_send_str(worker->output, - t_strdup_printf("%u\tBOX-LIST\n", - proxy_client_worker_next_tag(worker))); + o_stream_send_str(worker->output, "BOX-LIST\n"); proxy_client_worker_output_flush(_worker); return &iter->iter; } @@ -263,8 +353,7 @@ iter->pool = pool_alloconly_create("proxy message iter", 1024); str = t_str_new(512); - str_printfa(str, "%u\tMSG-LIST", - proxy_client_worker_next_tag(worker)); + str_append(str, "MSG-LIST"); for (i = 0; i < mailbox_count; i++) { str_append_c(str, '\t'); dsync_proxy_mailbox_guid_export(str, &mailboxes[i]); @@ -347,8 +436,7 @@ T_BEGIN { string_t *str = t_str_new(128); - str_printfa(str, "%u\tBOX-CREATE\t", - proxy_client_worker_next_tag(worker)); + str_append(str, "BOX-CREATE\t"); str_tabescape_write(str, dsync_box->name); if (dsync_box->uid_validity != 0) { str_append_c(str, '\t'); @@ -369,8 +457,7 @@ T_BEGIN { string_t *str = t_str_new(128); - str_printfa(str, "%u\tBOX-UPDATE\t", - proxy_client_worker_next_tag(worker)); + str_append(str, "BOX-UPDATE\t"); str_tabescape_write(str, dsync_box->name); str_append_c(str, '\t'); dsync_proxy_mailbox_guid_export(str, &dsync_box->guid); @@ -388,18 +475,14 @@ struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; - i_assert(worker->worker.next_tag == 0); - - if (memcmp(worker->selected_box_guid.guid, mailbox->guid, - sizeof(worker->selected_box_guid.guid)) == 0) + if (dsync_guid_equals(&worker->selected_box_guid, mailbox)) return; worker->selected_box_guid = *mailbox; T_BEGIN { string_t *str = t_str_new(128); - str_printfa(str, "%u\tBOX-SELECT\t", - proxy_client_worker_next_tag(worker)); + str_append(str, "BOX-SELECT\t"); dsync_proxy_mailbox_guid_export(str, mailbox); str_append_c(str, '\n'); o_stream_send(worker->output, str_data(str), str_len(str)); @@ -416,8 +499,7 @@ T_BEGIN { string_t *str = t_str_new(128); - str_printfa(str, "%u\tMSG-UPDATE\t%u\t%llu\t", - proxy_client_worker_next_tag(worker), msg->uid, + str_printfa(str, "MSG-UPDATE\t%u\t%llu\t", msg->uid, (unsigned long long)msg->modseq); imap_write_flags(str, msg->flags & ~MAIL_RECENT, msg->keywords); str_append_c(str, '\n'); @@ -426,15 +508,16 @@ } static void -proxy_client_worker_msg_update_uid(struct dsync_worker *_worker, uint32_t uid) +proxy_client_worker_msg_update_uid(struct dsync_worker *_worker, + uint32_t old_uid, uint32_t new_uid) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; T_BEGIN { o_stream_send_str(worker->output, - t_strdup_printf("%u\tMSG-UID-CHANGE\t%u\n", - proxy_client_worker_next_tag(worker), uid)); + t_strdup_printf("MSG-UID-CHANGE\t%u\t%u\n", + old_uid, new_uid)); } T_END; } @@ -446,8 +529,7 @@ T_BEGIN { o_stream_send_str(worker->output, - t_strdup_printf("%u\tMSG-EXPUNGE\t%u\n", - proxy_client_worker_next_tag(worker), uid)); + t_strdup_printf("MSG-EXPUNGE\t%u\n", uid)); } T_END; } @@ -455,45 +537,44 @@ proxy_client_worker_msg_copy(struct dsync_worker *_worker, const mailbox_guid_t *src_mailbox, uint32_t src_uid, - const struct dsync_message *dest_msg) + const struct dsync_message *dest_msg, + dsync_worker_copy_callback_t *callback, + void *context) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; + struct proxy_client_request request; T_BEGIN { string_t *str = t_str_new(128); - str_printfa(str, "%u\tMSG-COPY\t", - proxy_client_worker_next_tag(worker)); + str_append(str, "MSG-COPY\t"); dsync_proxy_mailbox_guid_export(str, src_mailbox); str_printfa(str, "\t%u\t", src_uid); dsync_proxy_msg_export(str, dest_msg); str_append_c(str, '\n'); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; + + memset(&request, 0, sizeof(request)); + request.type = PROXY_CLIENT_REQUEST_TYPE_COPY; + request.callback.copy = callback; + request.context = context; + aqueue_append(worker->request_queue, &request); } static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker) { const unsigned char *data; - size_t i, start, size; - - while (i_stream_read_data(worker->save_input, &data, &size, 0) > 0) { - if (worker->save_input_last_lf && data[0] == '.') - o_stream_send(worker->output, ".", 1); + size_t size; + int ret; - for (i = 1, start = 0; i < size; i++) { - if (data[i-1] == '\n' && data[i] == '.') { - o_stream_send(worker->output, data + start, - i - start); - o_stream_send(worker->output, ".", 1); - start = i; - } - } - o_stream_send(worker->output, data + start, i - start); - i_stream_skip(worker->save_input, i); - - worker->save_input_last_lf = data[i-1] == '\n'; + while ((ret = i_stream_read_data(worker->save_input, + &data, &size, 0)) > 0) { + dsync_proxy_send_dot_output(worker->output, + &worker->save_input_last_lf, + data, size); + i_stream_skip(worker->save_input, size); if (proxy_client_worker_is_output_full(&worker->worker)) { o_stream_uncork(worker->output); @@ -502,15 +583,35 @@ o_stream_cork(worker->output); } } - i_assert(size == 0); - o_stream_send(worker->output, "\n.\n", 3); - worker->save_input = NULL; + if (ret == 0) { + /* waiting for more input */ + o_stream_uncork(worker->output); + if (worker->save_io == NULL) { + int fd = i_stream_get_fd(worker->save_input); + + worker->save_io = + io_add(fd, IO_READ, + proxy_client_send_stream, worker); + } + return; + } + if (worker->save_io != NULL) + io_remove(&worker->save_io); + if (worker->save_input->stream_errno != 0) { + errno = worker->save_input->stream_errno; + i_error("proxy: reading message input failed: %m"); + o_stream_close(worker->output); + } else { + i_assert(!i_stream_have_bytes_left(worker->save_input)); + o_stream_send(worker->output, "\n.\n", 3); + } + i_stream_unref(&worker->save_input); } static void proxy_client_worker_msg_save(struct dsync_worker *_worker, const struct dsync_message *msg, - struct dsync_msg_static_data *data) + const struct dsync_msg_static_data *data) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; @@ -518,35 +619,48 @@ T_BEGIN { string_t *str = t_str_new(128); - str_printfa(str, "%u\tMSG-SAVE\t%ld\t", - proxy_client_worker_next_tag(worker), - (long)data->received_date); - str_tabescape_write(str, data->pop3_uidl); + str_append(str, "MSG-SAVE\t"); + dsync_proxy_msg_static_export(str, data); str_append_c(str, '\t'); dsync_proxy_msg_export(str, msg); str_append_c(str, '\n'); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; + i_assert(worker->save_io == NULL); i_assert(worker->save_input == NULL); worker->save_input = data->input; worker->save_input_last_lf = TRUE; + i_stream_ref(worker->save_input); proxy_client_send_stream(worker); } -static int -proxy_client_worker_msg_get(struct dsync_worker *worker ATTR_UNUSED, - uint32_t uid ATTR_UNUSED, - struct dsync_msg_static_data *data_r ATTR_UNUSED) +static void +proxy_client_worker_msg_get(struct dsync_worker *_worker, uint32_t uid, + dsync_worker_msg_callback_t *callback, + void *context) { - i_panic("proxy not supported for getting messages"); - return -1; + struct proxy_client_dsync_worker *worker = + (struct proxy_client_dsync_worker *)_worker; + struct proxy_client_request request; + + T_BEGIN { + string_t *str = t_str_new(128); + + str_printfa(str, "MSG-GET\t%u\n", uid); + o_stream_send(worker->output, str_data(str), str_len(str)); + } T_END; + + memset(&request, 0, sizeof(request)); + request.type = PROXY_CLIENT_REQUEST_TYPE_GET; + request.callback.get = callback; + request.context = context; + aqueue_append(worker->request_queue, &request); } struct dsync_worker_vfuncs proxy_client_dsync_worker = { proxy_client_worker_deinit, - proxy_client_worker_get_next_result, proxy_client_worker_is_output_full, proxy_client_worker_output_flush,
--- a/src/dsync/dsync-proxy-server-cmd.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-proxy-server-cmd.c Mon Jul 27 19:04:36 2009 -0400 @@ -210,10 +210,12 @@ static int cmd_msg_uid_change(struct dsync_proxy_server *server, const char *const *args) { - if (args[0] == NULL) + if (args[0] == NULL || args[1] == NULL) return -1; - dsync_worker_msg_update_uid(server->worker, strtoul(args[0], NULL, 10)); + dsync_worker_msg_update_uid(server->worker, + strtoul(args[0], NULL, 10), + strtoul(args[1], NULL, 10)); return 1; } @@ -227,6 +229,13 @@ return 1; } +static void copy_callback(bool success, void *context) +{ + struct dsync_proxy_server *server = context; + + o_stream_send(server->output, success ? "1\n" : "0\n", 2); +} + static int cmd_msg_copy(struct dsync_proxy_server *server, const char *const *args) { @@ -246,10 +255,11 @@ src_uid = strtoul(args[1], NULL, 10); if (dsync_proxy_msg_import_unescaped(pool_datastack_create(), - &msg, args+2, &error) < 0) + args + 2, &msg, &error) < 0) i_error("Invalid message input: %s", error); - dsync_worker_msg_copy(server->worker, &src_mailbox_guid, src_uid, &msg); + dsync_worker_msg_copy(server->worker, &src_mailbox_guid, src_uid, &msg, + copy_callback, server); return 1; } @@ -260,18 +270,18 @@ struct dsync_msg_static_data data; const char *error; - /* received_date pop3_uidl <message> */ - if (str_array_length(args) < 3) + if (dsync_proxy_msg_static_import_unescaped(pool_datastack_create(), + args, &data, &error) < 0) { + i_error("Invalid message input: %s", error); return -1; - - memset(&data, 0, sizeof(data)); - data.received_date = strtoul(args[0], NULL, 10); - data.pop3_uidl = args[1]; + } data.input = i_stream_create_dot(server->input, FALSE); if (dsync_proxy_msg_import_unescaped(pool_datastack_create(), - &msg, args+2, &error) < 0) + args + 2, &msg, &error) < 0) { i_error("Invalid message input: %s", error); + return -1; + } /* we rely on save reading the entire input */ net_set_nonblock(server->fd_in, FALSE); @@ -282,6 +292,74 @@ return 1; } +static void cmd_msg_get_send_more(struct dsync_proxy_server *server) +{ + const unsigned char *data; + size_t size; + int ret; + + while (!proxy_server_is_output_full(server)) { + ret = i_stream_read_data(server->get_input, &data, &size, 0); + if (ret == -1) { + /* done */ + i_stream_unref(&server->get_input); + break; + } else { + /* for now we assume input is blocking */ + i_assert(ret != 0); + } + + dsync_proxy_send_dot_output(server->output, + &server->get_input_last_lf, + data, size); + i_stream_skip(server->get_input, size); + } +} + +static void +cmd_msg_get_callback(enum dsync_msg_get_result result, + struct dsync_msg_static_data *data, void *context) +{ + struct dsync_proxy_server *server = context; + string_t *str; + + switch (result) { + case DSYNC_MSG_GET_RESULT_SUCCESS: + break; + case DSYNC_MSG_GET_RESULT_EXPUNGED: + o_stream_send(server->output, "*0\n", 3); + return; + case DSYNC_MSG_GET_RESULT_FAILED: + o_stream_send(server->output, "*-\n", 3); + return; + } + + str = t_str_new(128); + str_append(str, "*1\t"); + dsync_proxy_msg_static_export(str, data); + str_append_c(str, '\n'); + o_stream_send(server->output, str_data(str), str_len(str)); + + /* then we'll still have to send the message body. */ + server->get_input = data->input; + cmd_msg_get_send_more(server); +} + +static int +cmd_msg_get(struct dsync_proxy_server *server, const char *const *args) +{ + if (args[0] == NULL) + return -1; + + if (server->get_input != NULL) + cmd_msg_get_send_more(server); + else { + dsync_worker_msg_get(server->worker, strtoul(args[0], NULL, 10), + cmd_msg_get_callback, server); + } + return server->get_input == NULL ? 1 : 0; +} + static struct dsync_proxy_server_command commands[] = { { "BOX-LIST", cmd_box_list }, { "MSG-LIST", cmd_msg_list }, @@ -293,6 +371,7 @@ { "MSG-EXPUNGE", cmd_msg_expunge }, { "MSG-COPY", cmd_msg_copy }, { "MSG-SAVE", cmd_msg_save }, + { "MSG-GET", cmd_msg_get }, { NULL, NULL } };
--- a/src/dsync/dsync-proxy-server.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-proxy-server.c Mon Jul 27 19:04:36 2009 -0400 @@ -34,19 +34,12 @@ static int proxy_server_run_cmd(struct dsync_proxy_server *server) { - uint32_t tag; - int ret, result; + int ret; if ((ret = server->cur_cmd->func(server, server->cur_args)) == 0) return 0; if (ret < 0) i_error("command %s failed", server->cur_cmd->name); - dsync_worker_verify_result_is_clear(server->worker); - - while (dsync_worker_get_next_result(server->worker, &tag, &result)) { - o_stream_send_str(server->output, - t_strdup_printf("%u\t%d", tag, result)); - } server->cur_cmd = NULL; server->cur_args = NULL; @@ -59,23 +52,21 @@ const char *const *args; const char **cmd_args; unsigned int i, count; - uint32_t tag; i_assert(server->cur_cmd == NULL); args = t_strsplit(line, "\t"); - if (args[0] == NULL || args[1] == NULL) { + if (args[0] == NULL) { i_error("proxy client sent invalid input: %s", line); return -1; } - tag = strtoul(args[0], NULL, 10); - server->cur_cmd = dsync_proxy_server_command_find(args[1]); + server->cur_cmd = dsync_proxy_server_command_find(args[0]); if (server->cur_cmd == NULL) { - i_error("proxy client sent unknown command: %s", args[1]); + i_error("proxy client sent unknown command: %s", args[0]); return -1; } else { - args += 2; + args++; count = str_array_length(args); p_clear(server->cmd_pool); @@ -86,8 +77,6 @@ } server->cur_args = cmd_args; - if (tag != 0) - dsync_worker_set_next_result_tag(server->worker, tag); return proxy_server_run_cmd(server); } }
--- a/src/dsync/dsync-proxy-server.h Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-proxy-server.h Mon Jul 27 19:04:36 2009 -0400 @@ -23,6 +23,9 @@ struct dsync_worker_mailbox_iter *mailbox_iter; struct dsync_worker_msg_iter *msg_iter; + + struct istream *get_input; + bool get_input_last_lf; }; struct dsync_proxy_server *
--- a/src/dsync/dsync-proxy.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-proxy.c Mon Jul 27 19:04:36 2009 -0400 @@ -4,6 +4,7 @@ #include "array.h" #include "str.h" #include "strescape.h" +#include "ostream.h" #include "hex-binary.h" #include "mail-types.h" #include "imap-util.h" @@ -53,8 +54,8 @@ return 0; } -int dsync_proxy_msg_import_unescaped(pool_t pool, struct dsync_message *msg_r, - const char *const *args, +int dsync_proxy_msg_import_unescaped(pool_t pool, const char *const *args, + struct dsync_message *msg_r, const char **error_r) { /* guid uid modseq flags save_date */ @@ -86,8 +87,52 @@ args = p_strsplit(pool_datastack_create(), str, "\t"); for (i = 0; args[i] != NULL; i++) args[i] = str_tabunescape(args[i]); - ret = dsync_proxy_msg_import_unescaped(pool, msg_r, - (const char *const *)args, error_r); + ret = dsync_proxy_msg_import_unescaped(pool, + (const char *const *)args, + msg_r, error_r); + } T_END; + return ret; +} + +void dsync_proxy_msg_static_export(string_t *str, + const struct dsync_msg_static_data *msg) +{ + str_printfa(str, "%ld\t", (long)msg->received_date); + str_tabescape_write(str, msg->pop3_uidl); +} + +int dsync_proxy_msg_static_import_unescaped(pool_t pool, + const char *const *args, + struct dsync_msg_static_data *msg_r, + const char **error_r) +{ + /* received_date pop3_uidl */ + if (str_array_length(args) < 2) { + *error_r = "Missing parameters"; + return -1; + } + + memset(msg_r, 0, sizeof(*msg_r)); + msg_r->received_date = strtoul(args[0], NULL, 10); + msg_r->pop3_uidl = p_strdup(pool, args[1]); + return 0; +} + +int dsync_proxy_msg_static_import(pool_t pool, const char *str, + struct dsync_msg_static_data *msg_r, + const char **error_r) +{ + char **args; + unsigned int i; + int ret; + + T_BEGIN { + args = p_strsplit(pool_datastack_create(), str, "\t"); + for (i = 0; args[i] != NULL; i++) + args[i] = str_tabunescape(args[i]); + ret = dsync_proxy_msg_static_import_unescaped(pool, + (const char *const *)args, + msg_r, error_r); } T_END; return ret; } @@ -193,3 +238,25 @@ memcpy(guid_r->guid, buf->data, sizeof(guid_r->guid)); return 0; } + +void dsync_proxy_send_dot_output(struct ostream *output, bool *last_lf, + const unsigned char *data, size_t size) +{ + size_t i, start; + + i_assert(size > 0); + + if (*last_lf && data[0] == '.') + o_stream_send(output, ".", 1); + + for (i = 1, start = 0; i < size; i++) { + if (data[i-1] == '\n' && data[i] == '.') { + o_stream_send(output, data + start, i - start); + o_stream_send(output, ".", 1); + start = i; + } + } + o_stream_send(output, data + start, i - start); + *last_lf = data[i-1] == '\n'; + i_assert(i == size); +}
--- a/src/dsync/dsync-proxy.h Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-proxy.h Mon Jul 27 19:04:36 2009 -0400 @@ -9,12 +9,22 @@ void dsync_proxy_msg_export(string_t *str, const struct dsync_message *msg); int dsync_proxy_msg_parse_flags(pool_t pool, const char *str, struct dsync_message *msg_r); -int dsync_proxy_msg_import_unescaped(pool_t pool, struct dsync_message *msg_r, - const char *const *args, +int dsync_proxy_msg_import_unescaped(pool_t pool, const char *const *args, + struct dsync_message *msg_r, const char **error_r); int dsync_proxy_msg_import(pool_t pool, const char *str, struct dsync_message *msg_r, const char **error_r); +void dsync_proxy_msg_static_export(string_t *str, + const struct dsync_msg_static_data *msg); +int dsync_proxy_msg_static_import(pool_t pool, const char *str, + struct dsync_msg_static_data *msg_r, + const char **error_r); +int dsync_proxy_msg_static_import_unescaped(pool_t pool, + const char *const *args, + struct dsync_msg_static_data *msg_r, + const char **error_r); + void dsync_proxy_mailbox_export(string_t *str, const struct dsync_mailbox *box); int dsync_proxy_mailbox_import(pool_t pool, const char *str, struct dsync_mailbox *box_r, @@ -24,4 +34,7 @@ const mailbox_guid_t *mailbox); int dsync_proxy_mailbox_guid_import(const char *str, mailbox_guid_t *guid_r); +void dsync_proxy_send_dot_output(struct ostream *output, bool *last_lf, + const unsigned char *data, size_t size); + #endif
--- a/src/dsync/dsync-worker-local.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-worker-local.c Mon Jul 27 19:04:36 2009 -0400 @@ -2,7 +2,6 @@ #include "lib.h" #include "array.h" -#include "aqueue.h" #include "hash.h" #include "str.h" #include "hex-binary.h" @@ -38,11 +37,6 @@ const char *storage_name; }; -struct local_dsync_worker_result { - uint32_t tag; - int result; -}; - struct local_dsync_worker { struct dsync_worker worker; struct mail_user *user; @@ -54,9 +48,6 @@ mailbox_guid_t selected_box_guid; struct mailbox *selected_box; struct mail *mail; - - ARRAY_DEFINE(result_array, struct local_dsync_worker_result); - struct aqueue *result_queue; }; extern struct dsync_worker_vfuncs local_dsync_worker; @@ -99,8 +90,6 @@ worker->mailbox_hash = hash_table_create(default_pool, pool, 0, mailbox_guid_hash, mailbox_guid_cmp); - i_array_init(&worker->result_array, 128); - worker->result_queue = aqueue_init(&worker->result_array.arr); return &worker->worker; } @@ -110,47 +99,10 @@ (struct local_dsync_worker *)_worker; worker_mailbox_close(worker); - aqueue_deinit(&worker->result_queue); - array_free(&worker->result_array); hash_table_destroy(&worker->mailbox_hash); pool_unref(&worker->pool); } -static bool local_worker_get_next_result(struct dsync_worker *_worker, - uint32_t *tag_r, int *result_r) -{ - struct local_dsync_worker *worker = - (struct local_dsync_worker *)_worker; - const struct local_dsync_worker_result *results, *result; - - if (aqueue_count(worker->result_queue) == 0) - return FALSE; - - results = array_idx(&worker->result_array, 0); - result = &results[aqueue_idx(worker->result_queue, 0)]; - - *tag_r = result->tag; - *result_r = result->result; - aqueue_delete_tail(worker->result_queue); - return TRUE; -} - -static void -local_worker_set_result(struct local_dsync_worker *worker, int result) -{ - struct local_dsync_worker_result r; - - if (worker->worker.next_tag == 0) - return; - - memset(&r, 0, sizeof(r)); - r.tag = worker->worker.next_tag; - r.result = result; - aqueue_append(worker->result_queue, &r); - - worker->worker.next_tag = 0; -} - static bool local_worker_is_output_full(struct dsync_worker *worker ATTR_UNUSED) { return FALSE; @@ -169,7 +121,8 @@ struct local_dsync_worker_mailbox_iter *iter; enum mailbox_list_iter_flags list_flags = MAILBOX_LIST_ITER_VIRTUAL_NAMES | - MAILBOX_LIST_ITER_SKIP_ALIASES; + MAILBOX_LIST_ITER_SKIP_ALIASES | + MAILBOX_LIST_ITER_NO_AUTO_INBOX; static const char *patterns[] = { "*", NULL }; iter = i_new(struct local_dsync_worker_mailbox_iter, 1); @@ -287,8 +240,11 @@ return -1; } mailbox_get_status(box, STATUS_GUID, &status); - if (memcmp(status.mailbox_guid, guid, sizeof(guid)) != 0) { - i_error("Mailbox %s changed its GUID", lbox->storage_name); + if (memcmp(status.mailbox_guid, guid->guid, sizeof(guid->guid)) != 0) { + i_error("Mailbox %s changed its GUID (%s -> %s)", + lbox->storage_name, dsync_guid_to_str(guid), + binary_to_hex(status.mailbox_guid, + sizeof(status.mailbox_guid))); mailbox_close(&box); return -1; } @@ -527,7 +483,7 @@ box = local_worker_mailbox_alloc(worker, dsync_box); if (box == NULL) { - local_worker_set_result(worker, -1); + dsync_worker_set_failure(_worker); return; } local_worker_copy_mailbox_update(dsync_box, &update); @@ -539,6 +495,7 @@ dsync_box->uid_validity == 0); } if (ret < 0) { + dsync_worker_set_failure(_worker); i_error("Can't create mailbox %s: %s", dsync_box->name, mail_storage_get_last_error(mailbox_get_storage(box), NULL)); @@ -549,7 +506,6 @@ &dsync_box->guid); } mailbox_close(&box); - local_worker_set_result(worker, ret); } static void worker_mailbox_close(struct local_dsync_worker *worker) @@ -559,7 +515,8 @@ if (worker->selected_box != NULL) { trans = worker->mail->transaction; mail_free(&worker->mail); - (void)mailbox_transaction_commit(&trans); + if (mailbox_transaction_commit(&trans) < 0) + dsync_worker_set_failure(&worker->worker); mailbox_close(&worker->selected_box); } } @@ -572,28 +529,25 @@ (struct local_dsync_worker *)_worker; struct mailbox *box; struct mailbox_update update; - int ret; if (worker->selected_box != NULL && - memcmp(dsync_box->guid.guid, worker->selected_box_guid.guid, - sizeof(dsync_box->guid.guid)) == 0) + dsync_guid_equals(&dsync_box->guid, &worker->selected_box_guid)) worker_mailbox_close(worker); box = local_worker_mailbox_alloc(worker, dsync_box); if (box == NULL) { - local_worker_set_result(worker, -1); + dsync_worker_set_failure(_worker); return; } local_worker_copy_mailbox_update(dsync_box, &update); - ret = mailbox_update(box, &update); - if (ret < 0) { + if (mailbox_update(box, &update) < 0) { + dsync_worker_set_failure(_worker); i_error("Can't update mailbox %s: %s", dsync_box->name, mail_storage_get_last_error(mailbox_get_storage(box), NULL)); } mailbox_close(&box); - local_worker_set_result(worker, ret); } static void @@ -603,25 +557,22 @@ struct local_dsync_worker *worker = (struct local_dsync_worker *)_worker; struct mailbox_transaction_context *trans; - int ret; if (worker->selected_box != NULL) { - if (memcmp(worker->selected_box_guid.guid, mailbox->guid, - sizeof(worker->selected_box_guid.guid)) == 0) { - local_worker_set_result(worker, 0); + if (dsync_guid_equals(&worker->selected_box_guid, mailbox)) return; - } worker_mailbox_close(worker); } worker->selected_box_guid = *mailbox; - ret = local_mailbox_open(worker, mailbox, &worker->selected_box); - if (ret == 0) { + if (local_mailbox_open(worker, mailbox, &worker->selected_box) < 0) + dsync_worker_set_failure(_worker); + else { trans = mailbox_transaction_begin(worker->selected_box, - MAILBOX_TRANSACTION_FLAG_EXTERNAL); + MAILBOX_TRANSACTION_FLAG_EXTERNAL | + MAILBOX_TRANSACTION_FLAG_ASSIGN_UIDS); worker->mail = mail_alloc(trans, 0, NULL); } - local_worker_set_result(worker, ret); } static void @@ -632,27 +583,30 @@ (struct local_dsync_worker *)_worker; struct mail_keywords *keywords; - if (mail_set_uid(worker->mail, msg->uid)) { + if (!mail_set_uid(worker->mail, msg->uid)) + dsync_worker_set_failure(_worker); + else { mail_update_flags(worker->mail, MODIFY_REPLACE, msg->flags); - keywords = str_array_length(msg->keywords) == 0 ? NULL : - mailbox_keywords_create_valid(worker->mail->box, - msg->keywords); + keywords = mailbox_keywords_create_valid(worker->mail->box, + msg->keywords); mail_update_keywords(worker->mail, MODIFY_REPLACE, keywords); - if (keywords != NULL) - mailbox_keywords_unref(worker->mail->box, &keywords); + mailbox_keywords_unref(worker->mail->box, &keywords); // FIXME: update modseq if flags didn't change } - local_worker_set_result(worker, 0); } static void -local_worker_msg_update_uid(struct dsync_worker *_worker, uint32_t uid) +local_worker_msg_update_uid(struct dsync_worker *_worker, + uint32_t old_uid, uint32_t new_uid) { struct local_dsync_worker *worker = (struct local_dsync_worker *)_worker; - local_worker_set_result(worker, -1); + if (!mail_set_uid(worker->mail, old_uid)) + dsync_worker_set_failure(_worker); + else + mail_update_uid(worker->mail, new_uid); } static void local_worker_msg_expunge(struct dsync_worker *_worker, uint32_t uid) @@ -662,7 +616,6 @@ if (mail_set_uid(worker->mail, uid)) mail_expunge(worker->mail); - local_worker_set_result(worker, 0); } static void @@ -685,7 +638,8 @@ static void local_worker_msg_copy(struct dsync_worker *_worker, const mailbox_guid_t *src_mailbox, uint32_t src_uid, - const struct dsync_message *dest_msg) + const struct dsync_message *dest_msg, + dsync_worker_copy_callback_t *callback, void *context) { struct local_dsync_worker *worker = (struct local_dsync_worker *)_worker; @@ -696,7 +650,7 @@ int ret; if (local_mailbox_open(worker, src_mailbox, &src_box) < 0) { - local_worker_set_result(worker, -1); + callback(FALSE, context); return; } @@ -714,7 +668,8 @@ mail_free(&src_mail); (void)mailbox_transaction_commit(&src_trans); mailbox_close(&src_box); - local_worker_set_result(worker, ret); + + callback(ret == 0, context); } static void @@ -739,13 +694,14 @@ i_assert(input->eof); ret = mailbox_save_finish(&save_ctx); } - local_worker_set_result(worker, ret); + if (ret < 0) + dsync_worker_set_failure(&worker->worker); } static void local_worker_msg_save(struct dsync_worker *_worker, const struct dsync_message *msg, - struct dsync_msg_static_data *data) + const struct dsync_msg_static_data *data) { struct local_dsync_worker *worker = (struct local_dsync_worker *)_worker; @@ -759,44 +715,48 @@ mailbox_save_set_received_date(save_ctx, data->received_date, 0); if (mailbox_save_begin(&save_ctx, data->input) < 0) { - local_worker_set_result(worker, -1); + dsync_worker_set_failure(_worker); return; } local_worker_save_msg_continue(worker, save_ctx, data->input); } -static int +static void local_worker_msg_get(struct dsync_worker *_worker, uint32_t uid, - struct dsync_msg_static_data *data_r) + dsync_worker_msg_callback_t *callback, void *context) { struct local_dsync_worker *worker = (struct local_dsync_worker *)_worker; - int ret = 1; + struct dsync_msg_static_data data; - memset(data_r, 0, sizeof(*data_r)); if (worker->mail == NULL) { /* no mailbox is selected */ - return -1; + callback(DSYNC_MSG_GET_RESULT_FAILED, NULL, context); + return; + } + + if (!mail_set_uid(worker->mail, uid)) { + callback(DSYNC_MSG_GET_RESULT_EXPUNGED, NULL, context); + return; } - if (!mail_set_uid(worker->mail, uid)) - return 0; + memset(&data, 0, sizeof(data)); if (mail_get_special(worker->mail, MAIL_FETCH_UIDL_BACKEND, - &data_r->pop3_uidl) < 0) - ret = -1; - if (mail_get_received_date(worker->mail, &data_r->received_date) < 0) - ret = -1; - if (mail_get_stream(worker->mail, NULL, NULL, &data_r->input) < 0) - ret = -1; - if (ret < 0 && worker->mail->expunged) - ret = 0; - return ret; + &data.pop3_uidl) < 0 || + mail_get_received_date(worker->mail, &data.received_date) < 0 || + mail_get_stream(worker->mail, NULL, NULL, &data.input) < 0) { + if (worker->mail->expunged) + callback(DSYNC_MSG_GET_RESULT_EXPUNGED, NULL, context); + else + callback(DSYNC_MSG_GET_RESULT_FAILED, NULL, context); + } else { + callback(DSYNC_MSG_GET_RESULT_SUCCESS, &data, context); + } } struct dsync_worker_vfuncs local_dsync_worker = { local_worker_deinit, - local_worker_get_next_result, local_worker_is_output_full, local_worker_output_flush,
--- a/src/dsync/dsync-worker-private.h Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-worker-private.h Mon Jul 27 19:04:36 2009 -0400 @@ -8,8 +8,6 @@ struct dsync_worker_vfuncs { void (*deinit)(struct dsync_worker *); - bool (*get_next_result)(struct dsync_worker *worker, - uint32_t *tag_r, int *result_r); bool (*is_output_full)(struct dsync_worker *worker); int (*output_flush)(struct dsync_worker *worker); @@ -37,16 +35,18 @@ const mailbox_guid_t *mailbox); void (*msg_update_metadata)(struct dsync_worker *worker, const struct dsync_message *msg); - void (*msg_update_uid)(struct dsync_worker *worker, uint32_t uid); + void (*msg_update_uid)(struct dsync_worker *worker, + uint32_t old_uid, uint32_t new_uid); void (*msg_expunge)(struct dsync_worker *worker, uint32_t uid); void (*msg_copy)(struct dsync_worker *worker, const mailbox_guid_t *src_mailbox, uint32_t src_uid, - const struct dsync_message *dest_msg); + const struct dsync_message *dest_msg, + dsync_worker_copy_callback_t *callback, void *context); void (*msg_save)(struct dsync_worker *worker, const struct dsync_message *msg, - struct dsync_msg_static_data *data); - int (*msg_get)(struct dsync_worker *worker, uint32_t uid, - struct dsync_msg_static_data *data_r); + const struct dsync_msg_static_data *data); + void (*msg_get)(struct dsync_worker *worker, uint32_t uid, + dsync_worker_msg_callback_t *callback, void *context); }; struct dsync_worker { @@ -55,7 +55,7 @@ io_callback_t *input_callback, *output_callback; void *input_context, *output_context; - uint32_t next_tag; + unsigned int failed:1; }; struct dsync_worker_mailbox_iter { @@ -68,4 +68,6 @@ bool failed; }; +void dsync_worker_set_failure(struct dsync_worker *worker); + #endif
--- a/src/dsync/dsync-worker.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-worker.c Mon Jul 27 19:04:36 2009 -0400 @@ -18,25 +18,6 @@ worker->input_context = context; } -void dsync_worker_set_next_result_tag(struct dsync_worker *worker, - uint32_t tag) -{ - i_assert(tag != 0); - i_assert(worker->next_tag == 0); - worker->next_tag = tag; -} - -void dsync_worker_verify_result_is_clear(struct dsync_worker *worker) -{ - i_assert(worker->next_tag == 0); -} - -bool dsync_worker_get_next_result(struct dsync_worker *worker, - uint32_t *tag_r, int *result_r) -{ - return worker->v.get_next_result(worker, tag_r, result_r); -} - bool dsync_worker_is_output_full(struct dsync_worker *worker) { return worker->v.is_output_full(worker); @@ -118,35 +99,51 @@ void dsync_worker_msg_update_metadata(struct dsync_worker *worker, const struct dsync_message *msg) { - worker->v.msg_update_metadata(worker, msg); + if (!worker->failed) + worker->v.msg_update_metadata(worker, msg); } -void dsync_worker_msg_update_uid(struct dsync_worker *worker, uint32_t uid) +void dsync_worker_msg_update_uid(struct dsync_worker *worker, + uint32_t old_uid, uint32_t new_uid) { - worker->v.msg_update_uid(worker, uid); + if (!worker->failed) + worker->v.msg_update_uid(worker, old_uid, new_uid); } void dsync_worker_msg_expunge(struct dsync_worker *worker, uint32_t uid) { - worker->v.msg_expunge(worker, uid); + if (!worker->failed) + worker->v.msg_expunge(worker, uid); } void dsync_worker_msg_copy(struct dsync_worker *worker, const mailbox_guid_t *src_mailbox, uint32_t src_uid, - const struct dsync_message *dest_msg) + const struct dsync_message *dest_msg, + dsync_worker_copy_callback_t *callback, + void *context) { - worker->v.msg_copy(worker, src_mailbox, src_uid, dest_msg); + if (!worker->failed) { + worker->v.msg_copy(worker, src_mailbox, src_uid, dest_msg, + callback, context); + } } void dsync_worker_msg_save(struct dsync_worker *worker, const struct dsync_message *msg, - struct dsync_msg_static_data *data) + const struct dsync_msg_static_data *data) { - worker->v.msg_save(worker, msg, data); + if (!worker->failed) + worker->v.msg_save(worker, msg, data); } -int dsync_worker_msg_get(struct dsync_worker *worker, uint32_t uid, - struct dsync_msg_static_data *data_r) +void dsync_worker_msg_get(struct dsync_worker *worker, uint32_t uid, + dsync_worker_msg_callback_t *callback, void *context) { - return worker->v.msg_get(worker, uid, data_r); + if (!worker->failed) + worker->v.msg_get(worker, uid, callback, context); } + +void dsync_worker_set_failure(struct dsync_worker *worker) +{ + worker->failed = TRUE; +}
--- a/src/dsync/dsync-worker.h Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync-worker.h Mon Jul 27 19:04:36 2009 -0400 @@ -4,12 +4,17 @@ #include "ioloop.h" #include "dsync-data.h" -struct dsync_msg_static_data { - const char *pop3_uidl; - time_t received_date; - struct istream *input; +enum dsync_msg_get_result { + DSYNC_MSG_GET_RESULT_SUCCESS, + DSYNC_MSG_GET_RESULT_EXPUNGED, + DSYNC_MSG_GET_RESULT_FAILED }; +typedef void dsync_worker_copy_callback_t(bool success, void *context); +typedef void dsync_worker_msg_callback_t(enum dsync_msg_get_result result, + struct dsync_msg_static_data *data, + void *context); + struct dsync_worker *dsync_worker_init_local(struct mail_user *user); struct dsync_worker *dsync_worker_init_proxy_client(int fd_in, int fd_out); void dsync_worker_deinit(struct dsync_worker **worker); @@ -19,13 +24,6 @@ void dsync_worker_set_input_callback(struct dsync_worker *worker, io_callback_t *callback, void *context); -/* Request next command to return its result when it's finished. */ -void dsync_worker_set_next_result_tag(struct dsync_worker *worker, - uint32_t tag); -void dsync_worker_verify_result_is_clear(struct dsync_worker *worker); -/* Returns TRUE if result was returned, FALSE if waiting for more data */ -bool dsync_worker_get_next_result(struct dsync_worker *worker, - uint32_t *tag_r, int *result_r); /* Returns TRUE if command queue is full and caller should stop sending more commands. */ bool dsync_worker_is_output_full(struct dsync_worker *worker); @@ -54,7 +52,7 @@ unsigned int mailbox_count); /* Get the next available message. Also returns all expunged messages from the end of mailbox (if next_uid-1 message exists, nothing is returned). - mailbox_idx_r contains the mailbox's index in mailbox_guids[] array given + mailbox_idx_r contains the mailbox's index in mailboxes[] array given to _iter_init(). Returns 1 if ok, 0 if waiting for more data, -1 if there are no more messages. */ int dsync_worker_msg_iter_next(struct dsync_worker_msg_iter *iter, @@ -78,22 +76,26 @@ void dsync_worker_msg_update_metadata(struct dsync_worker *worker, const struct dsync_message *msg); /* Change message's UID. */ -void dsync_worker_msg_update_uid(struct dsync_worker *worker, uint32_t uid); +void dsync_worker_msg_update_uid(struct dsync_worker *worker, + uint32_t old_uid, uint32_t new_uid); /* Expunge given message. */ void dsync_worker_msg_expunge(struct dsync_worker *worker, uint32_t uid); /* Copy given message. */ void dsync_worker_msg_copy(struct dsync_worker *worker, const mailbox_guid_t *src_mailbox, uint32_t src_uid, - const struct dsync_message *dest_msg); + const struct dsync_message *dest_msg, + dsync_worker_copy_callback_t *callback, + void *context); /* Save given message from the given input stream. The stream is destroyed once saving is finished. */ void dsync_worker_msg_save(struct dsync_worker *worker, const struct dsync_message *msg, - struct dsync_msg_static_data *data); -/* Get message data for saving. Returns 1 if success, 0 if message is already - expunged or -1 if error. Caller must unreference the returned input - stream. */ -int dsync_worker_msg_get(struct dsync_worker *worker, uint32_t uid, - struct dsync_msg_static_data *data_r); + const struct dsync_msg_static_data *data); +/* Get message data for saving. The callback is called once when the static + data has been received. The whole message may not have been downloaded yet, + so the caller must read the input stream until it returns EOF and then + unreference it. */ +void dsync_worker_msg_get(struct dsync_worker *worker, uint32_t uid, + dsync_worker_msg_callback_t *callback, void *context); #endif
--- a/src/dsync/dsync.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/dsync.c Mon Jul 27 19:04:36 2009 -0400 @@ -133,7 +133,7 @@ i_set_failure_prefix(t_strdup_printf("dsync-src(%s): ", username)); worker2 = dsync_worker_init_proxy_client(fd_in, fd_out); - brain = dsync_brain_init(worker1, worker2); + brain = dsync_brain_init(worker1, worker2, TRUE); dsync_brain_sync(brain); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/dsync/test-dsync-brain-msgs.c Mon Jul 27 19:04:36 2009 -0400 @@ -0,0 +1,625 @@ +/* Copyright (c) 2009 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "sha1.h" +#include "crc32.h" +#include "dsync-brain-private.h" +#include "test-dsync-worker.h" +#include "test-dsync-common.h" + +enum test_box_add_type { + ADD_SRC, + ADD_DEST, + ADD_BOTH +}; + +struct test_dsync_mailbox { + struct dsync_brain_mailbox box; + ARRAY_DEFINE(src_msgs, struct dsync_message); + ARRAY_DEFINE(dest_msgs, struct dsync_message); +}; +ARRAY_DEFINE_TYPE(test_dsync_mailbox, struct test_dsync_mailbox); + +static ARRAY_TYPE(test_dsync_mailbox) mailboxes; +static struct test_dsync_worker *test_src_worker, *test_dest_worker; + +void dsync_brain_fail(struct dsync_brain *brain ATTR_UNUSED) {} +void dsync_brain_msg_sync_new_msgs(struct dsync_brain_mailbox_sync *sync ATTR_UNUSED) {} + +static struct test_dsync_mailbox *test_box_find(const char *name) +{ + struct test_dsync_mailbox *boxes; + unsigned int i, count; + + boxes = array_get_modifiable(&mailboxes, &count); + for (i = 0; i < count; i++) { + if (strcmp(boxes[i].box.box.name, name) == 0) + return &boxes[i]; + } + return NULL; +} + +static bool +test_box_has_guid(const char *name, const mailbox_guid_t *guid) +{ + const struct test_dsync_mailbox *box; + + box = test_box_find(name); + return box != NULL && + memcmp(box->box.box.guid.guid, guid->guid, + sizeof(box->box.box.guid.guid)) == 0; +} + +static struct test_dsync_mailbox * +test_box_add(enum test_box_add_type type, const char *name) +{ + unsigned char sha[SHA1_RESULTLEN]; + struct test_dsync_mailbox *tbox; + struct dsync_mailbox *box; + + tbox = test_box_find(name); + if (tbox == NULL) { + tbox = array_append_space(&mailboxes); + i_array_init(&tbox->src_msgs, 16); + i_array_init(&tbox->dest_msgs, 16); + } + + sha1_get_digest(name, strlen(name), sha); + + box = i_new(struct dsync_mailbox, 1); + box->name = i_strdup(name); + memcpy(box->guid.guid, sha, sizeof(box->guid.guid)); + box->uid_validity = crc32_str(name); + box->highest_modseq = 1; + + switch (type) { + case ADD_SRC: + tbox->box.src = box; + break; + case ADD_DEST: + tbox->box.dest = box; + break; + case ADD_BOTH: + tbox->box.src = box; + tbox->box.dest = box; + break; + } + tbox->box.box.name = box->name; + tbox->box.box.guid = box->guid; + tbox->box.box.uid_validity = box->uid_validity; + return tbox; +} + +static void test_msg_add(struct test_dsync_mailbox *box, + enum test_box_add_type type, + const char *guid, uint32_t uid) +{ + static int msg_date = 0; + struct dsync_message msg; + + memset(&msg, 0, sizeof(msg)); + msg.guid = i_strdup(guid); + msg.uid = uid; + msg.modseq = ++box->box.box.highest_modseq; + msg.save_date = ++msg_date; + + switch (type) { + case ADD_SRC: + box->box.src->highest_modseq++; + box->box.src->uid_next = uid + 1; + array_append(&box->src_msgs, &msg, 1); + break; + case ADD_DEST: + box->box.dest->highest_modseq++; + box->box.dest->uid_next = uid + 1; + array_append(&box->dest_msgs, &msg, 1); + break; + case ADD_BOTH: + box->box.src->highest_modseq++; + box->box.dest->highest_modseq++; + box->box.src->uid_next = uid + 1; + box->box.dest->uid_next = uid + 1; + array_append(&box->src_msgs, &msg, 1); + array_append(&box->dest_msgs, &msg, 1); + break; + } + if (box->box.box.uid_next <= uid) + box->box.box.uid_next = uid + 1; +} + +static void test_msg_set_modseq(struct test_dsync_mailbox *box, + enum test_box_add_type type, + uint32_t uid, uint64_t modseq) +{ + struct dsync_message *msgs; + unsigned int i, count; + + i_assert(modseq <= box->box.box.highest_modseq); + if (type != ADD_DEST) { + msgs = array_get_modifiable(&box->src_msgs, &count); + for (i = 0; i < count; i++) { + if (msgs[i].uid == uid) { + msgs[i].modseq = modseq; + break; + } + } + i_assert(i < count); + } + if (type != ADD_SRC) { + msgs = array_get_modifiable(&box->dest_msgs, &count); + for (i = 0; i < count; i++) { + if (msgs[i].uid == uid) { + msgs[i].modseq = modseq; + break; + } + } + i_assert(i < count); + } +} + +static void test_msg_set_flags(struct test_dsync_mailbox *box, + enum test_box_add_type type, + uint32_t uid, enum mail_flags flags) +{ + unsigned char guid_128_data[MAIL_GUID_128_SIZE * 2 + 1]; + struct dsync_message *msgs; + unsigned int i, count; + + box->box.box.highest_modseq++; + if (type != ADD_DEST) { + box->box.src->highest_modseq = box->box.box.highest_modseq; + msgs = array_get_modifiable(&box->src_msgs, &count); + for (i = 0; i < count; i++) { + if (msgs[i].uid == uid) { + if ((flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0) { + msgs[i].guid = i_strdup(dsync_get_guid_128_str(msgs[i].guid, + guid_128_data, sizeof(guid_128_data))); + } + msgs[i].flags = flags; + msgs[i].modseq = box->box.src->highest_modseq; + break; + } + } + i_assert(i < count); + } + if (type != ADD_SRC) { + box->box.dest->highest_modseq = box->box.box.highest_modseq; + msgs = array_get_modifiable(&box->dest_msgs, &count); + for (i = 0; i < count; i++) { + if (msgs[i].uid == uid) { + if ((flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0) { + msgs[i].guid = i_strdup(dsync_get_guid_128_str(msgs[i].guid, + guid_128_data, sizeof(guid_128_data))); + } + msgs[i].flags = flags; + msgs[i].modseq = box->box.dest->highest_modseq; + break; + } + } + i_assert(i < count); + } +} + +static void ATTR_SENTINEL +test_msg_set_keywords(struct test_dsync_mailbox *box, + enum test_box_add_type type, + uint32_t uid, const char *kw, ...) +{ + struct dsync_message *msgs; + unsigned int i, count; + va_list va; + ARRAY_TYPE(const_string) keywords; + + t_array_init(&keywords, 8); + array_append(&keywords, &kw, 1); + va_start(va, kw); + while ((kw = va_arg(va, const char *)) != NULL) + array_append(&keywords, &kw, 1); + va_end(va); + (void)array_append_space(&keywords); + + box->box.box.highest_modseq++; + if (type != ADD_DEST) { + box->box.src->highest_modseq = box->box.box.highest_modseq; + msgs = array_get_modifiable(&box->src_msgs, &count); + for (i = 0; i < count; i++) { + if (msgs[i].uid == uid) { + msgs[i].keywords = array_idx(&keywords, 0); + msgs[i].modseq = box->box.src->highest_modseq; + break; + } + } + i_assert(i < count); + } + if (type != ADD_SRC) { + box->box.dest->highest_modseq = box->box.box.highest_modseq; + msgs = array_get_modifiable(&box->dest_msgs, &count); + for (i = 0; i < count; i++) { + if (msgs[i].uid == uid) { + msgs[i].keywords = array_idx(&keywords, 0); + msgs[i].modseq = box->box.src->highest_modseq; + break; + } + } + i_assert(i < count); + } +} + +static void +test_dsync_sync_msgs(struct test_dsync_worker *worker, bool dest) +{ + const struct test_dsync_mailbox *boxes; + const struct dsync_message *msgs; + struct test_dsync_worker_msg test_msg; + unsigned int i, j, box_count, msg_count; + + boxes = array_get(&mailboxes, &box_count); + for (i = 0; i < box_count; i++) { + msgs = dest ? array_get(&boxes[i].dest_msgs, &msg_count) : + array_get(&boxes[i].src_msgs, &msg_count); + for (j = 0; j < msg_count; j++) { + test_msg.msg = msgs[j]; + test_msg.mailbox_idx = i; + array_append(&worker->msg_iter.msgs, &test_msg, 1); + worker->worker.input_callback(worker->worker.input_context); + } + } + + worker->msg_iter.last = TRUE; + worker->worker.input_callback(worker->worker.input_context); +} + +static struct dsync_brain *test_dsync_brain_init(void) +{ + struct dsync_brain *brain; + + brain = i_new(struct dsync_brain, 1); + brain->src_worker = dsync_worker_init_test(); + brain->dest_worker = dsync_worker_init_test(); + + test_src_worker = (struct test_dsync_worker *)brain->src_worker; + test_dest_worker = (struct test_dsync_worker *)brain->dest_worker; + return brain; +} + +static struct dsync_brain_mailbox_sync * +test_dsync_brain_sync_init(void) +{ + ARRAY_TYPE(dsync_brain_mailbox) brain_boxes; + struct dsync_brain_mailbox_sync *sync; + const struct test_dsync_mailbox *tboxes; + unsigned int i, count; + + tboxes = array_get(&mailboxes, &count); + t_array_init(&brain_boxes, count); + for (i = 0; i < count; i++) + array_append(&brain_boxes, &tboxes[i].box, 1); + + sync = dsync_brain_msg_sync_init(test_dsync_brain_init(), &brain_boxes); + test_dsync_sync_msgs(test_dest_worker, TRUE); + test_dsync_sync_msgs(test_src_worker, FALSE); + return sync; +} + +static void test_dsync_brain_msg_sync_box_multi(void) +{ + struct test_dsync_mailbox *box; + struct dsync_brain_mailbox_sync *sync; + struct test_dsync_msg_event msg_event; + const struct dsync_brain_new_msg *new_msgs; + unsigned int count; + + /* test that msg syncing finds and syncs all mailboxes */ + test_begin("dsync brain msg sync box multi"); + + i_array_init(&mailboxes, 32); + box = test_box_add(ADD_BOTH, "both"); + test_msg_add(box, ADD_BOTH, "guid1", 1); + test_msg_set_flags(box, ADD_SRC, 1, MAIL_SEEN); + test_msg_set_flags(box, ADD_DEST, 1, MAIL_DRAFT); + test_msg_set_flags(box, ADD_SRC, 1, MAIL_ANSWERED); + box = test_box_add(ADD_SRC, "src"); + test_msg_add(box, ADD_SRC, "guid2", 5); + box = test_box_add(ADD_DEST, "dest"); + test_msg_add(box, ADD_DEST, "guid3", 3); + + sync = test_dsync_brain_sync_init(); + + test_assert(test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + test_assert(msg_event.type == LAST_MSG_TYPE_UPDATE); + test_assert(test_box_has_guid("both", &msg_event.mailbox)); + test_assert(msg_event.msg.uid == 1); + test_assert(msg_event.msg.flags == MAIL_ANSWERED); + test_assert(!test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + + new_msgs = array_get(&sync->dest_msg_iter->new_msgs, &count); + test_assert(count == 1); + test_assert(new_msgs[0].mailbox_idx == 1); + test_assert(new_msgs[0].msg->uid == 5); + test_assert(strcmp(new_msgs[0].msg->guid, "guid2") == 0); + + new_msgs = array_get(&sync->src_msg_iter->new_msgs, &count); + test_assert(count == 1); + test_assert(new_msgs[0].mailbox_idx == 2); + test_assert(new_msgs[0].msg->uid == 3); + test_assert(strcmp(new_msgs[0].msg->guid, "guid3") == 0); + + test_end(); +} + +static void test_dsync_brain_msg_sync_box(enum test_box_add_type type) +{ + struct test_dsync_mailbox *box; + struct dsync_brain_mailbox_sync *sync; + struct test_dsync_msg_event msg_event; + const struct dsync_brain_new_msg *new_msgs; + unsigned int count; + + i_array_init(&mailboxes, 32); + box = test_box_add(type, "box1"); + test_msg_add(box, type, "guid1", 1); + box = test_box_add(type, "box2"); + test_msg_add(box, type, "guid2", 2); + + sync = test_dsync_brain_sync_init(); + + test_assert(!test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + + new_msgs = array_get(type == ADD_DEST ? &sync->src_msg_iter->new_msgs : + &sync->dest_msg_iter->new_msgs, &count); + test_assert(count == 2); + test_assert(new_msgs[0].mailbox_idx == 0); + test_assert(new_msgs[0].msg->uid == 1); + test_assert(strcmp(new_msgs[0].msg->guid, "guid1") == 0); + test_assert(new_msgs[1].mailbox_idx == 1); + test_assert(new_msgs[1].msg->uid == 2); + test_assert(strcmp(new_msgs[1].msg->guid, "guid2") == 0); +} + +static void test_dsync_brain_msg_sync_box_single(void) +{ + test_begin("dsync brain msg sync box src"); + test_dsync_brain_msg_sync_box(ADD_SRC); + test_end(); + + test_begin("dsync brain msg sync box dest"); + test_dsync_brain_msg_sync_box(ADD_DEST); + test_end(); +} + +static void test_dsync_brain_msg_sync_existing(void) +{ + struct test_dsync_mailbox *box; + struct dsync_brain_mailbox_sync *sync; + struct test_dsync_msg_event msg_event; + + test_begin("dsync brain msg sync existing"); + + i_array_init(&mailboxes, 1); + box = test_box_add(ADD_BOTH, "box"); + test_msg_add(box, ADD_BOTH, "guid1", 1); + test_msg_add(box, ADD_BOTH, "guid2", 2); + test_msg_add(box, ADD_BOTH, "guid5", 5); + test_msg_add(box, ADD_BOTH, "guid6", 6); + test_msg_add(box, ADD_BOTH, "guid9", 9); + test_msg_add(box, ADD_BOTH, "guid10", 10); + + /* unchanged */ + test_msg_set_flags(box, ADD_BOTH, 1, MAIL_SEEN); + + /* changed, same modseq - src will be used */ + test_msg_set_flags(box, ADD_SRC, 2, MAIL_ANSWERED); + test_msg_set_flags(box, ADD_DEST, 2, MAIL_ANSWERED | MAIL_SEEN); + test_msg_set_modseq(box, ADD_BOTH, 2, 2); + + /* changed, dest has higher modseq */ + test_msg_set_flags(box, ADD_BOTH, 5, MAIL_DRAFT); + test_msg_set_flags(box, ADD_DEST, 5, MAIL_FLAGGED); + + /* changed, src has higher modseq */ + test_msg_set_flags(box, ADD_DEST, 6, MAIL_FLAGGED); + test_msg_set_flags(box, ADD_SRC, 6, 0); + + /* keywords changed, src has higher modseq */ + test_msg_set_keywords(box, ADD_SRC, 9, "hello", "world", NULL); + + /* flag/keyword conflict, same modseq - src will be used */ + test_msg_set_keywords(box, ADD_SRC, 10, "foo", NULL); + test_msg_set_flags(box, ADD_SRC, 10, MAIL_SEEN); + test_msg_set_flags(box, ADD_DEST, 10, MAIL_DRAFT); + test_msg_set_modseq(box, ADD_BOTH, 10, 5); + + sync = test_dsync_brain_sync_init(); + test_assert(array_count(&sync->src_msg_iter->new_msgs) == 0); + test_assert(array_count(&sync->dest_msg_iter->new_msgs) == 0); + + test_assert(test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + test_assert(msg_event.type == LAST_MSG_TYPE_UPDATE); + test_assert(msg_event.msg.uid == 2); + test_assert(msg_event.msg.flags == MAIL_ANSWERED); + + test_assert(test_dsync_worker_next_msg_event(test_src_worker, &msg_event)); + test_assert(msg_event.type == LAST_MSG_TYPE_UPDATE); + test_assert(msg_event.msg.uid == 5); + test_assert(msg_event.msg.flags == MAIL_FLAGGED); + + test_assert(test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + test_assert(msg_event.type == LAST_MSG_TYPE_UPDATE); + test_assert(msg_event.msg.uid == 6); + test_assert(msg_event.msg.flags == 0); + + test_assert(test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + test_assert(msg_event.type == LAST_MSG_TYPE_UPDATE); + test_assert(msg_event.msg.uid == 9); + test_assert(msg_event.msg.flags == 0); + test_assert(strcmp(msg_event.msg.keywords[0], "hello") == 0); + test_assert(strcmp(msg_event.msg.keywords[1], "world") == 0); + test_assert(msg_event.msg.keywords[2] == NULL); + + test_assert(test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + test_assert(msg_event.type == LAST_MSG_TYPE_UPDATE); + test_assert(msg_event.msg.uid == 10); + test_assert(msg_event.msg.flags == MAIL_SEEN); + test_assert(strcmp(msg_event.msg.keywords[0], "foo") == 0); + test_assert(msg_event.msg.keywords[1] == NULL); + + test_assert(!test_dsync_worker_next_msg_event(test_src_worker, &msg_event)); + test_assert(!test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + test_end(); +} + +static void test_dsync_brain_msg_sync_expunges(void) +{ + struct test_dsync_mailbox *box; + struct dsync_brain_mailbox_sync *sync; + struct test_dsync_msg_event msg_event; + + test_begin("dsync brain msg sync expunges"); + + i_array_init(&mailboxes, 1); + box = test_box_add(ADD_BOTH, "box"); + + /* expunged from dest */ + test_msg_add(box, ADD_SRC, "guid1", 1); + /* expunged from src */ + test_msg_add(box, ADD_DEST, "guid2", 2); + /* expunged from dest with expunge record */ + test_msg_add(box, ADD_BOTH, "guid3", 3); + test_msg_set_flags(box, ADD_DEST, 3, DSYNC_MAIL_FLAG_EXPUNGED); + /* expunged from src with expunge record */ + test_msg_add(box, ADD_BOTH, "guid4", 4); + test_msg_set_flags(box, ADD_SRC, 4, DSYNC_MAIL_FLAG_EXPUNGED); + /* expunged from both, with expunge record in src */ + test_msg_add(box, ADD_SRC, "guid5", 5); + test_msg_set_flags(box, ADD_SRC, 5, DSYNC_MAIL_FLAG_EXPUNGED); + /* expunged from both, with expunge record in dest */ + test_msg_add(box, ADD_DEST, "guid6", 6); + test_msg_set_flags(box, ADD_DEST, 6, DSYNC_MAIL_FLAG_EXPUNGED); + /* expunged from both, with expunge record in both */ + test_msg_add(box, ADD_BOTH, "guid7", 7); + test_msg_set_flags(box, ADD_BOTH, 7, DSYNC_MAIL_FLAG_EXPUNGED); + + sync = test_dsync_brain_sync_init(); + test_assert(array_count(&sync->src_msg_iter->new_msgs) == 0); + test_assert(array_count(&sync->dest_msg_iter->new_msgs) == 0); + + test_assert(test_dsync_worker_next_msg_event(test_src_worker, &msg_event)); + test_assert(msg_event.type == LAST_MSG_TYPE_EXPUNGE); + test_assert(msg_event.msg.uid == 1); + + test_assert(test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + test_assert(msg_event.type == LAST_MSG_TYPE_EXPUNGE); + test_assert(msg_event.msg.uid == 2); + + test_assert(test_dsync_worker_next_msg_event(test_src_worker, &msg_event)); + test_assert(msg_event.type == LAST_MSG_TYPE_EXPUNGE); + test_assert(msg_event.msg.uid == 3); + + test_assert(test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + test_assert(msg_event.type == LAST_MSG_TYPE_EXPUNGE); + test_assert(msg_event.msg.uid == 4); + + test_assert(!test_dsync_worker_next_msg_event(test_src_worker, &msg_event)); + test_assert(!test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + test_end(); +} + +static void test_dsync_brain_msg_sync_uid_conflicts(void) +{ + struct test_dsync_mailbox *box; + struct dsync_brain_mailbox_sync *sync; + struct test_dsync_msg_event msg_event; + const struct dsync_brain_uid_conflict *conflicts; + const struct dsync_brain_new_msg *src_msgs, *dest_msgs; + unsigned int src_count, dest_count; + + test_begin("dsync brain msg sync uid conflicts"); + + i_array_init(&mailboxes, 16); + + /* existing guid mismatch */ + box = test_box_add(ADD_BOTH, "box1"); + test_msg_add(box, ADD_SRC, "guid1", 1); + test_msg_add(box, ADD_DEST, "guid2", 1); + + /* preserve uid */ + test_msg_add(box, ADD_BOTH, "guid3", 3); + /* extra message in src */ + test_msg_add(box, ADD_SRC, "guid4", 4); + /* extra message in dest */ + test_msg_add(box, ADD_DEST, "guid5", 5); + + /* conflict in expunged message expunged in dest */ + test_msg_add(box, ADD_SRC, "guid6", 6); + test_msg_add(box, ADD_DEST, "guid7", 6); + test_msg_set_flags(box, ADD_DEST, 6, DSYNC_MAIL_FLAG_EXPUNGED); + + /* conflict in expunged message expunged in src */ + test_msg_add(box, ADD_SRC, "guid8", 8); + test_msg_set_flags(box, ADD_SRC, 8, DSYNC_MAIL_FLAG_EXPUNGED); + test_msg_add(box, ADD_DEST, "guid9", 8); + + /* conflict in expunged message expunged in both */ + test_msg_add(box, ADD_SRC, "guid10", 10); + test_msg_set_flags(box, ADD_SRC, 10, DSYNC_MAIL_FLAG_EXPUNGED); + test_msg_add(box, ADD_DEST, "guid11", 10); + test_msg_set_flags(box, ADD_DEST, 10, DSYNC_MAIL_FLAG_EXPUNGED); + + sync = test_dsync_brain_sync_init(); + + conflicts = array_get(&sync->src_msg_iter->uid_conflicts, &src_count); + test_assert(src_count == 3); + test_assert(conflicts[0].old_uid == 1); + test_assert(conflicts[0].new_uid == 12); + test_assert(conflicts[1].old_uid == 4); + test_assert(conflicts[1].new_uid == 13); + test_assert(conflicts[2].old_uid == 6); + test_assert(conflicts[2].new_uid == 15); + + conflicts = array_get(&sync->dest_msg_iter->uid_conflicts, &dest_count); + test_assert(dest_count == 3); + test_assert(conflicts[0].old_uid == 1); + test_assert(conflicts[0].new_uid == 11); + test_assert(conflicts[1].old_uid == 5); + test_assert(conflicts[1].new_uid == 14); + test_assert(conflicts[2].old_uid == 8); + test_assert(conflicts[2].new_uid == 16); + + test_assert(!test_dsync_worker_next_msg_event(test_src_worker, &msg_event)); + test_assert(!test_dsync_worker_next_msg_event(test_dest_worker, &msg_event)); + + src_msgs = array_get(&sync->src_msg_iter->new_msgs, &src_count); + dest_msgs = array_get(&sync->dest_msg_iter->new_msgs, &dest_count); + test_assert(src_count == 3); + test_assert(dest_count == 3); + + test_assert(dest_msgs[0].msg->uid == 12); + test_assert(strcmp(dest_msgs[0].msg->guid, "guid1") == 0); + test_assert(src_msgs[0].msg->uid == 11); + test_assert(strcmp(src_msgs[0].msg->guid, "guid2") == 0); + test_assert(dest_msgs[1].msg->uid == 13); + test_assert(strcmp(dest_msgs[1].msg->guid, "guid4") == 0); + test_assert(src_msgs[1].msg->uid == 14); + test_assert(strcmp(src_msgs[1].msg->guid, "guid5") == 0); + test_assert(dest_msgs[2].msg->uid == 15); + test_assert(strcmp(dest_msgs[2].msg->guid, "guid6") == 0); + test_assert(src_msgs[2].msg->uid == 16); + test_assert(strcmp(src_msgs[2].msg->guid, "guid9") == 0); + + test_end(); +} + +int main(void) +{ + static void (*test_functions[])(void) = { + test_dsync_brain_msg_sync_box_multi, + test_dsync_brain_msg_sync_box_single, + test_dsync_brain_msg_sync_existing, + test_dsync_brain_msg_sync_expunges, + test_dsync_brain_msg_sync_uid_conflicts, + NULL + }; + + return test_run(test_functions); +}
--- a/src/dsync/test-dsync-brain.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/test-dsync-brain.c Mon Jul 27 19:04:36 2009 -0400 @@ -5,496 +5,236 @@ #include "sha1.h" #include "master-service.h" #include "dsync-brain-private.h" -#include "test-common.h" #include "test-dsync-worker.h" #include "test-dsync-common.h" -#include <stdlib.h> - -enum { - FLAG_EXISTS = 0x01, - FLAG_CREATED = 0x02, - FLAG_UNCHANGED = 0x04 -}; - -struct test_dsync_mailbox { - struct dsync_mailbox box; - struct dsync_message *src_msgs, *dest_msgs; - unsigned int dest_flags; -}; - -static const char *kw12[] = { "kw1", "kw2", NULL }; - -static struct dsync_message box1_src_msgs[] = { - { "guid1", 3, MAIL_SEEN, kw12, 123, 987 }, - { "guid2", 5, MAIL_DRAFT, NULL, 125, 989 }, - { "guid3", 8, 0, NULL, 128, 990 }, - { NULL, 0, 0, NULL, 0, 0 } -}; -static struct dsync_message box1_dest_msgs[] = { - { "guid1", 3, MAIL_FLAGGED, NULL, 123, 987 }, - { "guid2", 5, MAIL_DRAFT, kw12, 125, 989 }, - { NULL, 0, 0, NULL, 0, 0 } -}; - -static struct dsync_message box2_src_msgs[] = { - { "guid2", 6, MAIL_ANSWERED | MAIL_FLAGGED, NULL, 3434, 6552354 }, - { "guid4", 10, 0, NULL, 3426, 43643 }, - { NULL, 0, 0, NULL, 0, 0 } -}; - -static struct dsync_message box3_src_msgs[] = { - { "guid1", 1, MAIL_FLAGGED, NULL, 5454, 273850 }, - { "guid5", 5, 0, NULL, 331, 38701233 }, - { "b75c81f2b3a4c9f84f24851c37acedee", 6, DSYNC_MAIL_FLAG_EXPUNGED, NULL, 331, 38701233 }, - { NULL, 0, 0, NULL, 0, 0 } -}; -static struct dsync_message box3_dest_msgs[] = { - { "guid1", 1, MAIL_FLAGGED, NULL, 5454, 273850 }, - { "guid8", 3, 0, NULL, 330, 2424 }, - { "guid5", 5, 0, NULL, 1, 38701233 }, - { "guid6", 6, 0, NULL, 333, 6482 }, - { "guid7", 7, 0, NULL, 333, 6482 }, - { NULL, 0, 0, NULL, 0, 0 } -}; - -static struct test_dsync_mailbox basic_mailboxes[] = { - { { "box1", { { 0x12, 0x34, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, - 0x21, 0x43, 0x54, 0x76, 0x98, 0xba, 0xdc, 0xfe } }, - 1234567890, 4321, 605040302010ULL }, - box1_src_msgs, box1_dest_msgs, FLAG_EXISTS }, - { { "box2", { { 0xa3, 0xbd, 0x78, 0x24, 0xde, 0xfe, 0x08, 0xf7, - 0xac, 0xc7, 0xca, 0x8c, 0xe7, 0x39, 0xdb, 0xca } }, - 554321023, 6767, 79 }, - box2_src_msgs, NULL, 0 }, - { { "box3", { { 0x46, 0x25, 0xb3, 0x24, 0xde, 0xfe, 0x08, 0xf7, - 0xac, 0xc7, 0xca, 0x1a, 0xe7, 0x39, 0xdb, 0x54 } }, - 4545454, 656, 2366 }, box3_src_msgs, - box3_dest_msgs, FLAG_EXISTS }, - { { "dir1", { { 0, } }, 0, 0, 0 }, NULL, NULL, FLAG_EXISTS }, - { { "dir2", { { 0, } }, 0, 0, 0 }, NULL, NULL, 0 }, - { { "box4", { { 0x46, 0x2d, 0xa3, 0x24, 0x2e, 0x5e, 0x28, 0x67, - 0xa6, 0xc7, 0xca, 0x8a, 0xe7, 0x36, 0xd4, 0xa4 } }, - 2142, 445, 53535 }, box3_src_msgs, - box3_dest_msgs, FLAG_EXISTS | FLAG_UNCHANGED }, - { { NULL, { { 0, } }, 0, 0, 0 }, NULL, NULL, 0 } -}; - -static struct test_dsync_mailbox *mailboxes; struct master_service *master_service; +static struct test_dsync_worker *src_test_worker, *dest_test_worker; void master_service_stop(struct master_service *master_service ATTR_UNUSED) { } -void mail_generate_guid_128_hash(const char *guid, - uint8_t guid_128[MAIL_GUID_128_SIZE]) +struct dsync_brain_mailbox_sync * +dsync_brain_msg_sync_init(struct dsync_brain *brain, + const ARRAY_TYPE(dsync_brain_mailbox) *mailboxes) { - unsigned char sha1_sum[SHA1_RESULTLEN]; + struct dsync_brain_mailbox_sync *sync; - sha1_get_digest(guid, strlen(guid), sha1_sum); - memcpy(guid_128, sha1_sum, MAIL_GUID_128_SIZE); + sync = i_new(struct dsync_brain_mailbox_sync, 1); + sync->brain = brain; + i_array_init(&sync->mailboxes, array_count(mailboxes)); + array_append_array(&sync->mailboxes, mailboxes); + return sync; } -bool mail_guid_128_is_empty(const uint8_t guid_128[MAIL_GUID_128_SIZE] ATTR_UNUSED) +void dsync_brain_msg_sync_deinit(struct dsync_brain_mailbox_sync **_sync) { - return FALSE; + array_free(&(*_sync)->mailboxes); + i_free(*_sync); } -static bool mailbox_find(const char *name, unsigned int *idx_r) +void dsync_brain_msg_sync_resolve_uid_conflicts(struct dsync_brain_mailbox_sync *sync ATTR_UNUSED) {} + +static void mailboxes_set_guids(struct dsync_mailbox *boxes) +{ + unsigned char sha[SHA1_RESULTLEN]; + + for (; boxes->name != NULL; boxes++) { + sha1_get_digest(boxes->name, strlen(boxes->name), sha); + memcpy(boxes->guid.guid, sha, sizeof(boxes->guid.guid)); + } +} + +static void mailboxes_send_to_worker(struct test_dsync_worker *test_worker, + struct dsync_mailbox *boxes) { unsigned int i; - for (i = 0; mailboxes[i].box.name != NULL; i++) { - if (strcmp(mailboxes[i].box.name, name) == 0) { - *idx_r = i; - return TRUE; - } + for (i = 0; boxes[i].name != NULL; i++) { + test_worker->box_iter.next_box = &boxes[i]; + test_worker->worker.input_callback(test_worker->worker.input_context); } - return FALSE; -} - -static int test_dsync_mailbox_cmp(const void *p1, const void *p2) -{ - const struct test_dsync_mailbox *t1 = p1, *t2 = p2; - - return dsync_mailbox_guid_cmp(&t1->box, &t2->box); -} - -static void test_dsync_sync_msgs(struct test_dsync_worker *worker, bool dest) -{ - struct test_dsync_worker_msg test_msg; - struct dsync_message *msgs; - unsigned int i, j; - - for (i = 0; mailboxes[i].box.name != NULL; i++) { - msgs = dest ? mailboxes[i].dest_msgs : mailboxes[i].src_msgs; - if (msgs == NULL) - continue; - if ((mailboxes[i].dest_flags & FLAG_UNCHANGED) != 0) - continue; - - for (j = 0; msgs[j].guid != NULL; j++) { - test_msg.msg = msgs[j]; - test_msg.mailbox_idx = i; - array_append(&worker->msg_iter.msgs, &test_msg, 1); - worker->worker.input_callback(worker->worker.input_context); - } - } - - worker->msg_iter.last = TRUE; - worker->worker.input_callback(worker->worker.input_context); + test_worker->box_iter.last = TRUE; + test_worker->worker.input_callback(test_worker->worker.input_context); } -static int test_dsync_msg_event_cmp(const struct test_dsync_msg_event *e1, - const struct test_dsync_msg_event *e2) -{ - int ret; - - ret = memcmp(e1->mailbox.guid, e2->mailbox.guid, - sizeof(e1->mailbox.guid)); - if (ret != 0) - return ret; - - return (int)e1->msg.uid - (int)e2->msg.uid; -} - -static bool test_dsync_msg_find_guid(const struct test_dsync_mailbox *box, - const struct dsync_message *msg, - const struct test_dsync_mailbox **box2_r, - const struct dsync_message **msg2_r) +static bool +test_dsync_mailbox_create_equals(const struct dsync_mailbox *cbox, + const struct dsync_mailbox *obox) { - unsigned int i, j; - - for (i = 0; mailboxes[i].box.name != NULL; i++) { - if (mailboxes[i].src_msgs == NULL) - continue; - - for (j = 0; mailboxes[i].src_msgs[j].guid != NULL; j++) { - if (strcmp(mailboxes[i].src_msgs[j].guid, msg->guid) != 0) - continue; - - if (memcmp(mailboxes[i].box.guid.guid, box->box.guid.guid, - MAILBOX_GUID_SIZE) != 0 || - mailboxes[i].src_msgs[j].uid != msg->uid) { - *box2_r = &mailboxes[i]; - *msg2_r = &mailboxes[i].src_msgs[j]; - return TRUE; - } - } - } - return FALSE; -} - -static void -test_dsync_brain_verify_existing_one(const struct test_dsync_mailbox *box, - const struct test_dsync_msg_event *event, - const struct dsync_message *src) -{ - test_assert(event->msg.guid != NULL); - test_assert(event->type == LAST_MSG_TYPE_UPDATE); - test_assert(memcmp(event->mailbox.guid, box->box.guid.guid, - MAILBOX_GUID_SIZE) == 0); - test_assert(event->msg.flags == src->flags); - test_assert(dsync_keyword_list_equals(event->msg.keywords, - src->keywords)); - test_assert(event->msg.modseq == src->modseq); + return strcmp(cbox->name, obox->name) == 0 && + memcmp(cbox->guid.guid, obox->guid.guid, + sizeof(cbox->guid.guid)) == 0 && + cbox->uid_validity == obox->uid_validity && + cbox->uid_next == 0 && cbox->highest_modseq == 0; } static void -test_dsync_brain_verify_existing(const struct test_dsync_mailbox *box, - const struct test_dsync_msg_event **eventsp, - unsigned int *idx_r) +test_dsync_mailbox_update(const struct dsync_mailbox *bbox, + const struct dsync_mailbox *box) { - const struct test_dsync_msg_event *event = *eventsp; - unsigned int i, j; + struct test_dsync_box_event src_event, dest_event; + + test_assert(test_dsync_worker_next_box_event(src_test_worker, &src_event)); + test_assert(test_dsync_worker_next_box_event(dest_test_worker, &dest_event)); + test_assert(src_event.type == dest_event.type && + dsync_mailboxes_equal(&src_event.box, &dest_event.box)); - /* we don't try to handle uid conflicts here */ - i = j = 0; - while (box->src_msgs[i].guid != NULL && box->dest_msgs[j].guid != NULL) { - if (box->src_msgs[i].uid < box->dest_msgs[j].uid) { - /* need to add message to dest */ - i++; - } else if (box->src_msgs[i].uid > box->dest_msgs[j].uid) { - /* message expunged from src */ - test_assert(event->type == LAST_MSG_TYPE_EXPUNGE); - test_assert(memcmp(event->mailbox.guid, box->box.guid.guid, - MAILBOX_GUID_SIZE) == 0); - test_assert(event->msg.uid == box->dest_msgs[j].uid); - j++; event++; - } else if (box->src_msgs[i].flags == DSYNC_MAIL_FLAG_EXPUNGED) { - /* message expunged from end of mailbox */ - test_assert(event->type == LAST_MSG_TYPE_EXPUNGE); - test_assert(event->msg.uid == box->dest_msgs[j].uid); - i++; j++; event++; - } else if (box->src_msgs[i].modseq > box->dest_msgs[j].modseq || - box->src_msgs[i].flags != box->dest_msgs[j].flags || - !dsync_keyword_list_equals(box->src_msgs[i].keywords, - box->dest_msgs[j].keywords)) { - /* message changed */ - i_assert(strcmp(box->src_msgs[i].guid, - box->dest_msgs[j].guid) == 0); + test_assert(src_event.type == LAST_BOX_TYPE_UPDATE); + test_assert(dsync_mailboxes_equal(&src_event.box, box)); + test_assert(dsync_mailboxes_equal(bbox, box)); +} - test_dsync_brain_verify_existing_one(box, event, - &box->src_msgs[i]); - i++; j++; event++; - } else { - /* message unchanged */ - i_assert(strcmp(box->src_msgs[i].guid, - box->dest_msgs[j].guid) == 0); - i++; j++; - } - } - while (box->dest_msgs[j].guid != NULL) { - /* message expunged from src */ - test_assert(event->type == LAST_MSG_TYPE_EXPUNGE); - test_assert(memcmp(event->mailbox.guid, box->box.guid.guid, - MAILBOX_GUID_SIZE) == 0); - test_assert(event->msg.uid == box->dest_msgs[j].uid); - j++; event++; - } - *idx_r = i; - *eventsp = event; +static int +dsync_brain_mailbox_name_cmp(const struct dsync_brain_mailbox *box1, + const struct dsync_brain_mailbox *box2) +{ + return strcmp(box1->box.name, box2->box.name); } -static void -test_dsync_brain_verify_mailbox(const struct test_dsync_mailbox *box, - const struct test_dsync_msg_event **eventsp) +static void test_dsync_brain(void) { - const struct test_dsync_msg_event *event = *eventsp; - const struct test_dsync_mailbox *box2; - const struct dsync_message *msgs, *msg2; - unsigned int i = 0; - - if (box->src_msgs == NULL) - return; - - msgs = box->src_msgs; - if (box->dest_msgs != NULL) { - /* sync existing messages */ - test_dsync_brain_verify_existing(box, &event, &i); - } - - /* sync new messages */ - for (; msgs[i].guid != NULL; i++) { - test_assert(event->msg.guid != NULL); - test_assert(memcmp(event->mailbox.guid, box->box.guid.guid, - MAILBOX_GUID_SIZE) == 0); - if (test_dsync_msg_find_guid(box, &msgs[i], &box2, &msg2)) { - test_assert(event->type == LAST_MSG_TYPE_COPY); - test_assert(memcmp(event->copy_src_mailbox.guid, - box2->box.guid.guid, - MAILBOX_GUID_SIZE) == 0); - test_assert(event->copy_src_uid == msg2->uid); - } else { - test_assert(event->type == LAST_MSG_TYPE_SAVE); - test_assert(strcmp(event->save_body, - "hdr\n\nbody") == 0); - } - test_assert(strcmp(event->msg.guid, msgs[i].guid) == 0); - test_assert(event->msg.uid == msgs[i].uid); - test_assert(event->msg.flags == msgs[i].flags); - test_assert(dsync_keyword_list_equals(event->msg.keywords, - msgs[i].keywords)); - test_assert(event->msg.modseq == msgs[i].modseq); - test_assert(event->msg.save_date == msgs[i].save_date); - - event++; - } - - *eventsp = event; -} - -static void -test_dsync_brain_verify_msg_events(struct test_dsync_worker *dest_test_worker) -{ - ARRAY_DEFINE(msg_events, struct test_dsync_msg_event); - const struct test_dsync_msg_event *events, *events_end; - struct test_dsync_msg_event msg_event; - unsigned int i, event_count; - - /* get events and sort them so we can easily check if they're correct */ - t_array_init(&msg_events, 64); - while (test_dsync_worker_next_msg_event(dest_test_worker, &msg_event)) - array_append(&msg_events, &msg_event, 1); - array_sort(&msg_events, test_dsync_msg_event_cmp); - - events = array_get(&msg_events, &event_count); - events_end = events + event_count; - for (i = 0; mailboxes[i].box.name != NULL; i++) { - if ((mailboxes[i].dest_flags & FLAG_UNCHANGED) == 0) - test_dsync_brain_verify_mailbox(&mailboxes[i], &events); - } - test_assert(events == events_end); -} - -static void -test_dsync_brain_run(const struct test_dsync_mailbox *test_mailboxes, - void (*verify_func)(struct test_dsync_worker *)) -{ + static struct dsync_mailbox src_boxes[] = { + { "box1", { { 0, } }, 1234567890, 5432, 123123123123ULL }, + { "box2", { { 0, } }, 1234567890, 5432, 123123123123ULL }, + { "box3", { { 0, } }, 1234567890, 5432, 123123123123ULL }, + { "box4", { { 0, } }, 1234567890, 5432, 123123123123ULL }, + { "box5", { { 0, } }, 1234567890, 5433, 123123123123ULL }, + { "box6", { { 0, } }, 1234567890, 5432, 123123123124ULL }, + { "boxx", { { 0, } }, 1234567890, 5432, 123123123123ULL }, + { NULL, { { 0, } }, 0, 0, 0 } + }; + static struct dsync_mailbox dest_boxes[] = { + { "box1", { { 0, } }, 1234567890, 5432, 123123123123ULL }, + { "box2", { { 0, } }, 1234567891, 5432, 123123123123ULL }, + { "box3", { { 0, } }, 1234567890, 5433, 123123123123ULL }, + { "box4", { { 0, } }, 1234567890, 5432, 123123123124ULL }, + { "box5", { { 0, } }, 1234567890, 5432, 123123123123ULL }, + { "box6", { { 0, } }, 1234567890, 5432, 123123123123ULL }, + { "boxy", { { 0, } }, 1234567890, 5432, 123123123123ULL }, + { NULL, { { 0, } }, 0, 0, 0 } + }; struct dsync_brain *brain; struct dsync_worker *src_worker, *dest_worker; - struct test_dsync_worker *src_test_worker, *dest_test_worker; - struct dsync_mailbox new_box; struct test_dsync_box_event box_event; - unsigned int i, j, box_count; + const struct dsync_brain_mailbox *brain_boxes; + unsigned int i, count; - box_count = 0; - while (test_mailboxes[box_count].box.name != NULL) - box_count++; + test_begin("dsync brain"); - mailboxes = t_new(struct test_dsync_mailbox, box_count + 1); - memcpy(mailboxes, test_mailboxes, sizeof(*mailboxes) * box_count); + mailboxes_set_guids(src_boxes); + mailboxes_set_guids(dest_boxes); src_worker = dsync_worker_init_test(); dest_worker = dsync_worker_init_test(); src_test_worker = (struct test_dsync_worker *)src_worker; dest_test_worker = (struct test_dsync_worker *)dest_worker; - brain = dsync_brain_init(src_worker, dest_worker); + brain = dsync_brain_init(src_worker, dest_worker, 0); dsync_brain_sync(brain); /* have brain read the mailboxes */ - for (i = 0; mailboxes[i].box.name != NULL; i++) { - src_test_worker->box_iter.next_box = &mailboxes[i].box; - src_worker->input_callback(src_worker->input_context); - - if ((mailboxes[i].dest_flags & FLAG_EXISTS) != 0) { - if ((mailboxes[i].dest_flags & FLAG_UNCHANGED) == 0) - mailboxes[i].box.highest_modseq++; - dest_test_worker->box_iter.next_box = &mailboxes[i].box; - dest_worker->input_callback(dest_worker->input_context); - } - } - src_test_worker->box_iter.last = TRUE; - src_worker->input_callback(src_worker->input_context); - dest_test_worker->box_iter.last = TRUE; - dest_worker->input_callback(dest_worker->input_context); + mailboxes_send_to_worker(src_test_worker, src_boxes); + mailboxes_send_to_worker(dest_test_worker, dest_boxes); /* check that it created missing mailboxes */ - while (test_dsync_worker_next_box_event(dest_test_worker, &box_event)) { - test_assert(box_event.type == LAST_BOX_TYPE_CREATE); - test_assert(mailbox_find(box_event.box.name, &i)); - test_assert(mailboxes[i].dest_flags == 0); - mailboxes[i].dest_flags |= FLAG_CREATED; + test_assert(test_dsync_worker_next_box_event(dest_test_worker, &box_event)); + test_assert(box_event.type == LAST_BOX_TYPE_CREATE); + test_assert(test_dsync_mailbox_create_equals(&box_event.box, &src_boxes[6])); + + test_assert(test_dsync_worker_next_box_event(src_test_worker, &box_event)); + test_assert(box_event.type == LAST_BOX_TYPE_CREATE); + test_assert(test_dsync_mailbox_create_equals(&box_event.box, &dest_boxes[6])); - new_box = mailboxes[i].box; - new_box.uid_next = 0; - new_box.highest_modseq = 0; - test_assert(dsync_mailboxes_equal(&box_event.box, &new_box)); - } + test_assert(!test_dsync_worker_next_box_event(src_test_worker, &box_event)); + test_assert(!test_dsync_worker_next_box_event(dest_test_worker, &box_event)); + + array_sort(&brain->mailbox_sync->mailboxes, + dsync_brain_mailbox_name_cmp); + + /* check mailbox updates */ + brain->state++; + dsync_brain_sync(brain); - /* brain wants mailboxes in guid order. make things easier for us - by sorting them now. */ - qsort(mailboxes, box_count, sizeof(*mailboxes), - test_dsync_mailbox_cmp); + brain_boxes = array_get(&brain->mailbox_sync->mailboxes, &count); + test_assert(count == 7); + for (i = 0; i < 5; i++) { + test_assert(dsync_mailboxes_equal(brain_boxes[i].src, &src_boxes[i+1])); + test_assert(dsync_mailboxes_equal(brain_boxes[i].dest, &dest_boxes[i+1])); + } + test_assert(dsync_mailboxes_equal(brain_boxes[5].src, &src_boxes[6])); + test_assert(brain_boxes[5].dest == NULL); + test_assert(brain_boxes[6].src == NULL); + test_assert(dsync_mailboxes_equal(brain_boxes[6].dest, &dest_boxes[6])); - /* start syncing messages */ - for (i = j = 0; mailboxes[i].box.name != NULL; i++) { - if ((mailboxes[i].dest_flags & FLAG_UNCHANGED) != 0) - continue; - test_assert(memcmp(&dest_test_worker->msg_iter_mailboxes[j], - mailboxes[i].box.guid.guid, MAILBOX_GUID_SIZE) == 0); - j++; - } - test_assert(dest_test_worker->msg_iter_mailbox_count == j); - test_dsync_sync_msgs(src_test_worker, FALSE); - test_dsync_sync_msgs(dest_test_worker, TRUE); + test_dsync_mailbox_update(&brain_boxes[0].box, &src_boxes[1]); + test_dsync_mailbox_update(&brain_boxes[1].box, &dest_boxes[2]); + test_dsync_mailbox_update(&brain_boxes[2].box, &dest_boxes[3]); + test_dsync_mailbox_update(&brain_boxes[3].box, &src_boxes[4]); + test_dsync_mailbox_update(&brain_boxes[4].box, &src_boxes[5]); + test_dsync_mailbox_update(&brain_boxes[5].box, &src_boxes[6]); + test_dsync_mailbox_update(&brain_boxes[6].box, &dest_boxes[6]); - verify_func(dest_test_worker); + test_assert(!test_dsync_worker_next_box_event(src_test_worker, &box_event)); + test_assert(!test_dsync_worker_next_box_event(dest_test_worker, &box_event)); dsync_worker_deinit(&src_worker); dsync_worker_deinit(&dest_worker); dsync_brain_deinit(&brain); -} -static void test_dsync_brain(void) -{ - test_begin("dsync brain basics"); - test_dsync_brain_run(basic_mailboxes, - test_dsync_brain_verify_msg_events); test_end(); } -static struct dsync_message conflict_src_msgs[] = { - { "guid1", 1, 0, NULL, 1, 1 }, - { "guid3", 3, 0, NULL, 1, 1 }, - { "guid5", 5, 0, NULL, 1, 1 }, - { "guidy", 6, 0, NULL, 1, 1 }, - { "67ddfe2125de633c56e033b57c897018", 7, DSYNC_MAIL_FLAG_EXPUNGED, NULL, 1, 1 }, - { "guidz", 8, DSYNC_MAIL_FLAG_EXPUNGED, NULL, 1, 1 }, - { NULL, 0, 0, NULL, 0, 0 } -}; -static struct dsync_message conflict_dest_msgs[] = { - { "guid1", 1, 0, NULL, 1, 1 }, - { "guid2", 2, 0, NULL, 1, 1 }, - { "guidx", 3, 0, NULL, 1, 1 }, - { "guid4", 4, 0, NULL, 1, 1 }, - { "guid5", 5, 0, NULL, 1, 1 }, - { "guid7", 7, 0, NULL, 1, 1 }, - { "guid8", 8, 0, NULL, 1, 1 }, - { NULL, 0, 0, NULL, 0, 0 } -}; +static void test_dsync_brain_full(void) +{ + static struct dsync_mailbox boxes[] = { + { "box1", { { 0, } }, 1234567890, 5432, 123123123123ULL }, + { NULL, { { 0, } }, 0, 0, 0 } + }; + struct dsync_brain *brain; + struct dsync_worker *src_worker, *dest_worker; + struct test_dsync_box_event box_event; + const struct dsync_brain_mailbox *brain_boxes; + unsigned int count; -static struct test_dsync_mailbox conflict_mailboxes[] = { - { { "box1", { { 0x12, 0x34, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, - 0x21, 0x43, 0x54, 0x76, 0x98, 0xba, 0xdc, 0xfe } }, - 1234567890, 4321, 888 }, - conflict_src_msgs, conflict_dest_msgs, FLAG_EXISTS }, - { { NULL, { { 0, } }, 0, 0, 0 }, NULL, NULL, 0 } -}; + test_begin("dsync brain full"); + + mailboxes_set_guids(boxes); -static void -test_dsync_brain_verify_uid_conflict(struct test_dsync_worker *dest_test_worker) -{ - struct test_dsync_msg_event event; - struct test_dsync_box_event box_event; + src_worker = dsync_worker_init_test(); + dest_worker = dsync_worker_init_test(); + src_test_worker = (struct test_dsync_worker *)src_worker; + dest_test_worker = (struct test_dsync_worker *)dest_worker; - test_assert(test_dsync_worker_next_msg_event(dest_test_worker, &event)); - test_assert(event.type == LAST_MSG_TYPE_EXPUNGE); - test_assert(event.msg.uid == 2); + brain = dsync_brain_init(src_worker, dest_worker, + DSYNC_BRAIN_FLAG_FULL_SYNC); + dsync_brain_sync(brain); - test_assert(test_dsync_worker_next_msg_event(dest_test_worker, &event)); - test_assert(event.type == LAST_MSG_TYPE_EXPUNGE); - test_assert(event.msg.uid == 7); - - test_assert(test_dsync_worker_next_msg_event(dest_test_worker, &event)); - test_assert(event.type == LAST_MSG_TYPE_SAVE); - test_assert(event.msg.uid == 4321); - test_assert(strcmp(event.msg.guid, "guid3") == 0); + /* have brain read the mailboxes */ + mailboxes_send_to_worker(src_test_worker, boxes); + mailboxes_send_to_worker(dest_test_worker, boxes); - test_assert(test_dsync_worker_next_msg_event(dest_test_worker, &event)); - test_assert(event.type == LAST_MSG_TYPE_SAVE); - test_assert(event.msg.uid == 6); - test_assert(strcmp(event.msg.guid, "guidy") == 0); + test_assert(!test_dsync_worker_next_box_event(src_test_worker, &box_event)); + test_assert(!test_dsync_worker_next_box_event(dest_test_worker, &box_event)); - test_assert(test_dsync_worker_next_msg_event(dest_test_worker, &event)); - test_assert(event.type == LAST_MSG_TYPE_UPDATE_UID); - test_assert(event.msg.uid == 3); + /* check mailbox updates */ + brain->state++; + dsync_brain_sync(brain); - test_assert(test_dsync_worker_next_msg_event(dest_test_worker, &event)); - test_assert(event.type == LAST_MSG_TYPE_UPDATE_UID); - test_assert(event.msg.uid == 8); - - test_assert(!test_dsync_worker_next_msg_event(dest_test_worker, &event)); - - while (test_dsync_worker_next_box_event(dest_test_worker, &box_event) && - box_event.type == LAST_BOX_TYPE_SELECT) ; + brain_boxes = array_get(&brain->mailbox_sync->mailboxes, &count); + test_assert(count == 1); + test_assert(dsync_mailboxes_equal(brain_boxes[0].src, &boxes[0])); + test_assert(dsync_mailboxes_equal(brain_boxes[0].dest, &boxes[0])); + test_dsync_mailbox_update(&brain_boxes[0].box, &boxes[0]); - test_assert(box_event.type == LAST_BOX_TYPE_UPDATE); - test_assert(box_event.box.uid_next == 4322); - test_assert(box_event.box.uid_validity == 1234567890); - test_assert(box_event.box.highest_modseq == 888); + test_assert(!test_dsync_worker_next_box_event(src_test_worker, &box_event)); + test_assert(!test_dsync_worker_next_box_event(dest_test_worker, &box_event)); - while (test_dsync_worker_next_box_event(dest_test_worker, &box_event)) - test_assert(box_event.type == LAST_BOX_TYPE_SELECT); -} + dsync_worker_deinit(&src_worker); + dsync_worker_deinit(&dest_worker); + dsync_brain_deinit(&brain); -static void test_dsync_brain_uid_conflict(void) -{ - test_begin("dsync brain uid conflict"); - test_dsync_brain_run(conflict_mailboxes, - test_dsync_brain_verify_uid_conflict); test_end(); } @@ -502,7 +242,7 @@ { static void (*test_functions[])(void) = { test_dsync_brain, - test_dsync_brain_uid_conflict, + test_dsync_brain_full, NULL }; return test_run(test_functions);
--- a/src/dsync/test-dsync-common.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/test-dsync-common.c Mon Jul 27 19:04:36 2009 -0400 @@ -1,6 +1,7 @@ /* Copyright (c) 2009 Dovecot authors, see the included COPYING file */ #include "lib.h" +#include "sha1.h" #include "dsync-data.h" #include "test-dsync-common.h" @@ -49,3 +50,17 @@ return FALSE; return TRUE; } + +void mail_generate_guid_128_hash(const char *guid, + uint8_t guid_128[MAIL_GUID_128_SIZE]) +{ + unsigned char sha1_sum[SHA1_RESULTLEN]; + + sha1_get_digest(guid, strlen(guid), sha1_sum); + memcpy(guid_128, sha1_sum, MAIL_GUID_128_SIZE); +} + +bool mail_guid_128_is_empty(const uint8_t guid_128[MAIL_GUID_128_SIZE] ATTR_UNUSED) +{ + return FALSE; +}
--- a/src/dsync/test-dsync-common.h Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/test-dsync-common.h Mon Jul 27 19:04:36 2009 -0400 @@ -1,6 +1,7 @@ #ifndef TEST_DSYNC_COMMON_H #define TEST_DSYNC_COMMON_H +#include "test-common.h" #include "dsync-data.h" #define TEST_MAILBOX_GUID1 "1234456789abcdef2143547698badcfe"
--- a/src/dsync/test-dsync-proxy-server-cmd.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/test-dsync-proxy-server-cmd.c Mon Jul 27 19:04:36 2009 -0400 @@ -199,19 +199,13 @@ static void test_dsync_proxy_box_select(void) { - struct test_dsync_box_event event; - test_begin("proxy server box select"); test_assert(run_cmd("BOX-SELECT", TEST_MAILBOX_GUID1, NULL) == 1); - test_assert(test_dsync_worker_next_box_event(test_worker, &event)); - test_assert(event.type == LAST_BOX_TYPE_SELECT); - test_assert(memcmp(event.box.guid.guid, test_mailbox_guid1, MAILBOX_GUID_SIZE) == 0); + test_assert(memcmp(test_worker->selected_mailbox.guid, test_mailbox_guid1, MAILBOX_GUID_SIZE) == 0); test_assert(run_cmd("BOX-SELECT", TEST_MAILBOX_GUID2, NULL) == 1); - test_assert(test_dsync_worker_next_box_event(test_worker, &event)); - test_assert(event.type == LAST_BOX_TYPE_SELECT); - test_assert(memcmp(event.box.guid.guid, test_mailbox_guid2, MAILBOX_GUID_SIZE) == 0); + test_assert(memcmp(test_worker->selected_mailbox.guid, test_mailbox_guid2, MAILBOX_GUID_SIZE) == 0); test_end(); } @@ -242,10 +236,11 @@ test_begin("proxy server msg uid change"); - test_assert(run_cmd("MSG-UID-CHANGE", "454", NULL) == 1); + test_assert(run_cmd("MSG-UID-CHANGE", "454", "995", NULL) == 1); test_assert(test_dsync_worker_next_msg_event(test_worker, &event)); test_assert(event.type == LAST_MSG_TYPE_UPDATE_UID); test_assert(event.msg.uid == 454); + test_assert(event.msg.modseq == 995); test_end(); }
--- a/src/dsync/test-dsync-worker.c Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/test-dsync-worker.c Mon Jul 27 19:04:36 2009 -0400 @@ -34,40 +34,6 @@ i_free(worker); } -static void -test_worker_set_result(struct test_dsync_worker *worker, int result) -{ - struct test_dsync_worker_result r; - - if (worker->worker.next_tag == 0) - return; - - memset(&r, 0, sizeof(r)); - r.tag = worker->worker.next_tag; - r.result = result; - array_append(&worker->results, &r, 1); - - worker->worker.next_tag = 0; -} - -static bool -test_worker_get_next_result(struct dsync_worker *_worker, - uint32_t *tag_r, int *result_r) -{ - struct test_dsync_worker *worker = - (struct test_dsync_worker *)_worker; - const struct test_dsync_worker_result *result; - - if (array_count(&worker->results) == 0) - return FALSE; - - result = array_idx(&worker->results, 0); - *tag_r = result->tag; - *result_r = result->result; - array_delete(&worker->results, 0, 1); - return TRUE; -} - static bool test_worker_is_output_full(struct dsync_worker *worker ATTR_UNUSED) { return FALSE; @@ -197,20 +163,14 @@ test_worker_create_mailbox(struct dsync_worker *_worker, const struct dsync_mailbox *dsync_box) { - struct test_dsync_worker *worker = (struct test_dsync_worker *)_worker; - test_worker_set_last_box(_worker, dsync_box, LAST_BOX_TYPE_CREATE); - test_worker_set_result(worker, 0); } static void test_worker_update_mailbox(struct dsync_worker *_worker, const struct dsync_mailbox *dsync_box) { - struct test_dsync_worker *worker = (struct test_dsync_worker *)_worker; - test_worker_set_last_box(_worker, dsync_box, LAST_BOX_TYPE_UPDATE); - test_worker_set_result(worker, 0); } static void @@ -224,8 +184,6 @@ memset(&box, 0, sizeof(box)); memcpy(box.guid.guid, mailbox, sizeof(box.guid.guid)); - test_worker_set_last_box(_worker, &box, LAST_BOX_TYPE_SELECT); - test_worker_set_result(worker, 0); } static struct test_dsync_msg_event * @@ -276,19 +234,19 @@ struct test_dsync_worker *worker = (struct test_dsync_worker *)_worker; test_worker_set_last_msg(worker, msg, LAST_MSG_TYPE_UPDATE); - test_worker_set_result(worker, 0); } static void -test_worker_msg_update_uid(struct dsync_worker *_worker, uint32_t uid) +test_worker_msg_update_uid(struct dsync_worker *_worker, + uint32_t old_uid, uint32_t new_uid) { struct test_dsync_worker *worker = (struct test_dsync_worker *)_worker; struct dsync_message msg; memset(&msg, 0, sizeof(msg)); - msg.uid = uid; + msg.uid = old_uid; + msg.modseq = new_uid; test_worker_set_last_msg(worker, &msg, LAST_MSG_TYPE_UPDATE_UID); - test_worker_set_result(worker, 0); } static void test_worker_msg_expunge(struct dsync_worker *_worker, uint32_t uid) @@ -299,13 +257,13 @@ memset(&msg, 0, sizeof(msg)); msg.uid = uid; test_worker_set_last_msg(worker, &msg, LAST_MSG_TYPE_EXPUNGE); - test_worker_set_result(worker, 0); } static void test_worker_msg_copy(struct dsync_worker *_worker, const mailbox_guid_t *src_mailbox, - uint32_t src_uid, const struct dsync_message *dest_msg) + uint32_t src_uid, const struct dsync_message *dest_msg, + dsync_worker_copy_callback_t *callback, void *context) { struct test_dsync_worker *worker = (struct test_dsync_worker *)_worker; struct test_dsync_msg_event *event; @@ -313,13 +271,13 @@ event = test_worker_set_last_msg(worker, dest_msg, LAST_MSG_TYPE_COPY); event->copy_src_mailbox = *src_mailbox; event->copy_src_uid = src_uid; - test_worker_set_result(worker, 0); + callback(TRUE, context); } static void test_worker_msg_save(struct dsync_worker *_worker, const struct dsync_message *msg, - struct dsync_msg_static_data *data) + const struct dsync_msg_static_data *data) { struct test_dsync_worker *worker = (struct test_dsync_worker *)_worker; struct test_dsync_msg_event *event; @@ -339,28 +297,26 @@ } i_assert(ret == -1); event->save_body = p_strdup(worker->tmp_pool, str_c(body)); - - test_worker_set_result(worker, 0); } -static int -test_worker_msg_get(struct dsync_worker *_worker, - uint32_t uid ATTR_UNUSED, - struct dsync_msg_static_data *data_r) +static void +test_worker_msg_get(struct dsync_worker *_worker, uint32_t uid ATTR_UNUSED, + dsync_worker_msg_callback_t *callback, void *context) { struct test_dsync_worker *worker = (struct test_dsync_worker *)_worker; + struct dsync_msg_static_data data; - data_r->pop3_uidl = "uidl"; - data_r->received_date = 123456; - data_r->input = worker->body_stream; - i_stream_seek(data_r->input, 0); - return 1; + memset(&data, 0, sizeof(data)); + data.pop3_uidl = "uidl"; + data.received_date = 123456; + data.input = worker->body_stream; + i_stream_seek(data.input, 0); + callback(DSYNC_MSG_GET_RESULT_SUCCESS, &data, context); } struct dsync_worker_vfuncs test_dsync_worker = { test_worker_deinit, - test_worker_get_next_result, test_worker_is_output_full, test_worker_output_flush,
--- a/src/dsync/test-dsync-worker.h Mon Jul 27 19:02:12 2009 -0400 +++ b/src/dsync/test-dsync-worker.h Mon Jul 27 19:04:36 2009 -0400 @@ -5,8 +5,7 @@ enum test_dsync_last_box_type { LAST_BOX_TYPE_CREATE, - LAST_BOX_TYPE_UPDATE, - LAST_BOX_TYPE_SELECT + LAST_BOX_TYPE_UPDATE }; enum test_dsync_last_msg_type {