Mercurial > dovecot > core-2.2
view src/dsync/dsync-brain-msgs-new.c @ 10607:e9046fc7c6b4 HEAD
dsync: Message list wasn't sorted properly, which caused sync to be incomplete.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Fri, 29 Jan 2010 18:05:32 +0200 |
parents | 615eef3139c2 |
children | 6799298bfa27 |
line wrap: on
line source
/* Copyright (c) 2009-2010 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "array.h" #include "istream.h" #include "hash.h" #include "dsync-worker.h" #include "dsync-brain-private.h" struct dsync_brain_msg_copy_context { struct dsync_brain_msg_iter *iter; unsigned int msg_idx; }; struct dsync_brain_msg_save_context { struct dsync_brain_msg_iter *iter; const struct dsync_message *msg; unsigned int mailbox_idx; }; static void dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter); static void msg_get_callback(enum dsync_msg_get_result result, const struct dsync_msg_static_data *data, void *context) { struct dsync_brain_msg_save_context *ctx = context; const struct dsync_brain_mailbox *mailbox; struct istream *input; mailbox = array_idx(&ctx->iter->sync->mailboxes, ctx->mailbox_idx); switch (result) { case DSYNC_MSG_GET_RESULT_SUCCESS: /* the mailbox may have changed, make sure we've the right one */ dsync_worker_select_mailbox(ctx->iter->worker, &mailbox->box); input = data->input; dsync_worker_msg_save(ctx->iter->worker, ctx->msg, data); i_stream_unref(&input); break; case DSYNC_MSG_GET_RESULT_EXPUNGED: /* mail got expunged during sync. just skip this. */ break; case DSYNC_MSG_GET_RESULT_FAILED: i_error("msg-get failed: box=%s uid=%u guid=%s", mailbox->box.name, ctx->msg->uid, ctx->msg->guid); dsync_brain_fail(ctx->iter->sync->brain); break; } if (--ctx->iter->save_results_left == 0 && !ctx->iter->adding_msgs) dsync_brain_msg_sync_add_new_msgs(ctx->iter); } static void dsync_brain_copy_callback(bool success, void *context) { struct dsync_brain_msg_copy_context *ctx = context; const struct dsync_brain_new_msg *msg; struct dsync_brain_guid_instance *inst; if (!success) { /* mark the guid instance invalid and try again later */ msg = array_idx(&ctx->iter->new_msgs, ctx->msg_idx); inst = hash_table_lookup(ctx->iter->guid_hash, msg->msg->guid); inst->failed = TRUE; array_append(&ctx->iter->copy_retry_indexes, &ctx->msg_idx, 1); } if (--ctx->iter->copy_results_left == 0 && !ctx->iter->adding_msgs) dsync_brain_msg_sync_add_new_msgs(ctx->iter); } static int dsync_brain_msg_sync_add_new_msg(struct dsync_brain_msg_iter *dest_iter, const mailbox_guid_t *src_mailbox, unsigned int msg_idx, const struct dsync_brain_new_msg *msg) { struct dsync_brain_msg_save_context *save_ctx; struct dsync_brain_msg_copy_context *copy_ctx; struct dsync_brain_msg_iter *src_iter; const struct dsync_brain_guid_instance *inst; const struct dsync_brain_mailbox *inst_box; inst = hash_table_lookup(dest_iter->guid_hash, msg->msg->guid); if (inst != NULL) { /* we can save this by copying an existing message */ inst_box = array_idx(&dest_iter->sync->mailboxes, inst->mailbox_idx); copy_ctx = p_new(dest_iter->sync->pool, struct dsync_brain_msg_copy_context, 1); copy_ctx->iter = dest_iter; copy_ctx->msg_idx = msg_idx; dsync_worker_msg_copy(dest_iter->worker, &inst_box->box.mailbox_guid, inst->uid, msg->msg, dsync_brain_copy_callback, copy_ctx); dest_iter->copy_results_left++; } else { src_iter = dest_iter == dest_iter->sync->dest_msg_iter ? dest_iter->sync->src_msg_iter : dest_iter->sync->dest_msg_iter; save_ctx = p_new(src_iter->sync->pool, struct dsync_brain_msg_save_context, 1); save_ctx->iter = dest_iter; save_ctx->msg = dsync_message_dup(src_iter->sync->pool, msg->msg); save_ctx->mailbox_idx = dest_iter->mailbox_idx; dest_iter->adding_msgs = TRUE; dest_iter->save_results_left++; dsync_worker_msg_get(src_iter->worker, src_mailbox, msg->orig_uid, msg_get_callback, save_ctx); dest_iter->adding_msgs = FALSE; if (dsync_worker_output_flush(src_iter->worker) < 0) return -1; } return dsync_worker_is_output_full(dest_iter->worker) ? 0 : 1; } static bool dsync_brain_mailbox_add_new_msgs(struct dsync_brain_msg_iter *iter, const mailbox_guid_t *mailbox_guid) { const struct dsync_brain_new_msg *msgs; unsigned int i, msg_count; bool ret = TRUE; msgs = array_get(&iter->new_msgs, &msg_count); for (i = iter->next_new_msg; i < msg_count; i++) { if (msgs[i].mailbox_idx != iter->mailbox_idx) { i_assert(msgs[i].mailbox_idx > iter->mailbox_idx); ret = FALSE; break; } if (dsync_brain_msg_sync_add_new_msg(iter, mailbox_guid, i, &msgs[i]) <= 0) { /* failed / continue later */ i++; break; } } iter->next_new_msg = i; if (i == msg_count) ret = FALSE; /* flush copy commands */ if (dsync_worker_output_flush(iter->worker) > 0 && ret) { /* we have more space again, continue */ return dsync_brain_mailbox_add_new_msgs(iter, mailbox_guid); } else { return ret; } } static void dsync_brain_mailbox_save_conflicts(struct dsync_brain_msg_iter *iter) { const struct dsync_brain_uid_conflict *conflicts; unsigned int i, count; conflicts = array_get(&iter->uid_conflicts, &count); for (i = iter->next_conflict; i < count; i++) { if (conflicts[i].mailbox_idx != iter->mailbox_idx) break; dsync_worker_msg_update_uid(iter->worker, conflicts[i].old_uid, conflicts[i].new_uid); } iter->next_conflict = i; } static void dsync_brain_mailbox_retry_copies(struct dsync_brain_msg_iter *iter, const mailbox_guid_t *mailbox_guid) { const uint32_t *indexes; const struct dsync_brain_new_msg *msgs; unsigned int i, msg_idx, idx_count, msg_count; struct dsync_brain_guid_instance *inst; const char *guid_str; void *orig_key, *orig_value; /* first remove GUID instances that had failed. */ msgs = array_get(&iter->new_msgs, &msg_count); indexes = array_get(&iter->copy_retry_indexes, &idx_count); for (i = 0; i < idx_count; i++) { guid_str = msgs[indexes[i]].msg->guid; if (hash_table_lookup_full(iter->guid_hash, guid_str, &orig_key, &orig_value)) inst = orig_value; else inst = NULL; if (inst != NULL && inst->failed) { inst = inst->next; if (inst == NULL) hash_table_remove(iter->guid_hash, guid_str); else { hash_table_update(iter->guid_hash, orig_key, inst); } } } /* try saving again. there probably weren't many of them, so don't worry about filling output buffer. */ for (i = 0; i < idx_count; i++) { msg_idx = indexes[i]; // FIXME: if buffer fills, we assert-crash (void)dsync_brain_msg_sync_add_new_msg(iter, mailbox_guid, msg_idx, &msgs[msg_idx]); } /* if we copied anything, we'll again have to wait for the results */ array_clear(&iter->copy_retry_indexes); } static void dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter) { const struct dsync_brain_mailbox *mailbox; const mailbox_guid_t *mailbox_guid; while (iter->mailbox_idx < array_count(&iter->sync->mailboxes)) { mailbox = array_idx(&iter->sync->mailboxes, iter->mailbox_idx); mailbox_guid = &mailbox->box.mailbox_guid; dsync_worker_select_mailbox(iter->worker, &mailbox->box); if (dsync_brain_mailbox_add_new_msgs(iter, mailbox_guid)) { /* continue later */ return; } /* all messages saved for this mailbox. continue with saving its conflicts and waiting for copies to finish. */ dsync_brain_mailbox_save_conflicts(iter); while (iter->copy_results_left == 0 && array_count(&iter->copy_retry_indexes) > 0) dsync_brain_mailbox_retry_copies(iter, mailbox_guid); if (iter->copy_results_left > 0) { /* wait for copies to finish */ return; } /* done with this mailbox, try the next one */ iter->mailbox_idx++; } iter->msgs_sent = TRUE; /* done with all mailboxes from this iter */ dsync_worker_set_input_callback(iter->worker, NULL, NULL); if (iter->sync->src_msg_iter->msgs_sent && iter->sync->dest_msg_iter->msgs_sent && iter->sync->src_msg_iter->save_results_left == 0 && iter->sync->dest_msg_iter->save_results_left == 0 && dsync_worker_output_flush(iter->sync->dest_worker) > 0 && dsync_worker_output_flush(iter->sync->src_worker) > 0) { iter->sync->brain->state++; dsync_brain_sync(iter->sync->brain); } } static void dsync_worker_new_msg_output(void *context) { struct dsync_brain_msg_iter *iter = context; dsync_brain_msg_sync_add_new_msgs(iter); } static int dsync_brain_new_msg_cmp(const struct dsync_brain_new_msg *m1, const struct dsync_brain_new_msg *m2) { if (m1->mailbox_idx < m2->mailbox_idx) return -1; if (m1->mailbox_idx > m2->mailbox_idx) return 1; if (m1->msg->uid < m2->msg->uid) return -1; if (m1->msg->uid > m2->msg->uid) return 1; return 0; } static int dsync_brain_uid_conflict_cmp(const struct dsync_brain_uid_conflict *c1, const struct dsync_brain_uid_conflict *c2) { if (c1->mailbox_idx < c2->mailbox_idx) return -1; if (c1->mailbox_idx < c2->mailbox_idx) return 1; if (c1->new_uid < c2->new_uid) return -1; if (c1->new_uid > c2->new_uid) return 1; return 0; } static void dsync_brain_msg_iter_sync_new_msgs(struct dsync_brain_msg_iter *iter) { iter->mailbox_idx = 0; array_sort(&iter->new_msgs, dsync_brain_new_msg_cmp); array_sort(&iter->uid_conflicts, dsync_brain_uid_conflict_cmp); dsync_worker_set_input_callback(iter->worker, NULL, iter); dsync_worker_set_output_callback(iter->worker, dsync_worker_new_msg_output, iter); dsync_brain_msg_sync_add_new_msgs(iter); } void dsync_brain_msg_sync_new_msgs(struct dsync_brain_mailbox_sync *sync) { dsync_brain_msg_iter_sync_new_msgs(sync->src_msg_iter); dsync_brain_msg_iter_sync_new_msgs(sync->dest_msg_iter); } static void sync_iter_resolve_uid_conflicts(struct dsync_brain_msg_iter *iter) { const struct dsync_brain_uid_conflict *conflicts; const struct dsync_brain_mailbox *mailboxes, *mailbox; unsigned int i, count, mailbox_count; mailboxes = array_get(&iter->sync->mailboxes, &mailbox_count); conflicts = array_get(&iter->uid_conflicts, &count); for (i = 0; i < count; i++) { mailbox = &mailboxes[conflicts[i].mailbox_idx]; dsync_worker_select_mailbox(iter->worker, &mailbox->box); dsync_worker_msg_update_uid(iter->worker, conflicts[i].old_uid, conflicts[i].new_uid); } } void dsync_brain_msg_sync_resolve_uid_conflicts(struct dsync_brain_mailbox_sync *sync) { sync_iter_resolve_uid_conflicts(sync->src_msg_iter); sync_iter_resolve_uid_conflicts(sync->dest_msg_iter); }