Mercurial > dovecot > core-2.2
view src/doveadm/dsync/dsync-mailbox-import.c @ 14682:d0d7b810646b
Make sure we check all the functions' return values. Minor API changes to simplify this.
Checked using a patched clang that adds attribute(warn_unused_result) to all
functions. This commit fixes several error handling mistakes.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Mon, 25 Jun 2012 01:14:03 +0300 |
parents | 69ba6977bed8 |
children | 9ff19c1d5f69 |
line wrap: on
line source
/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "array.h" #include "hash.h" #include "istream.h" #include "seq-range-array.h" #include "mail-storage-private.h" #include "mail-search-build.h" #include "dsync-transaction-log-scan.h" #include "dsync-mail.h" #include "dsync-mailbox-import.h" struct importer_mail { const char *guid; uint32_t uid; }; struct importer_new_mail { /* linked list of mails for this GUID */ struct importer_new_mail *next; /* if non-NULL, this mail exists in both local and remote. this link points to the other side. */ struct importer_new_mail *link; const char *guid; struct dsync_mail_change *change; uint32_t uid; unsigned int uid_in_local:1; unsigned int uid_is_usable:1; unsigned int skip:1; unsigned int copy_failed:1; }; struct dsync_mailbox_importer { pool_t pool; struct mailbox *box; uint32_t last_common_uid; uint64_t last_common_modseq; uint32_t remote_uid_next; uint32_t remote_first_recent_uid; uint64_t remote_highest_modseq; struct mailbox_transaction_context *trans, *ext_trans; struct mail_search_context *search_ctx; struct mail *mail, *ext_mail; struct mail *cur_mail; const char *cur_guid; /* UID => struct dsync_mail_change */ const struct hash_table *local_changes; ARRAY_TYPE(seq_range) maybe_expunge_uids; ARRAY_DEFINE(maybe_saves, struct dsync_mail_change *); /* GUID => struct importer_new_mail */ struct hash_table *import_guids; /* UID => struct importer_new_mail */ struct hash_table *import_uids; ARRAY_DEFINE(newmails, struct importer_new_mail *); ARRAY_TYPE(uint32_t) wanted_uids; ARRAY_DEFINE(mail_requests, struct dsync_mail_request); unsigned int mail_request_idx; uint32_t prev_uid, next_local_seq, local_uid_next; uint64_t local_initial_highestmodseq; unsigned int failed:1; unsigned int last_common_uid_found:1; unsigned int cur_uid_has_change:1; unsigned int cur_mail_saved:1; unsigned int local_expunged_guids_set:1; unsigned int new_uids_assigned:1; unsigned int want_mail_requests:1; unsigned int mails_have_guids:1; unsigned int master_brain:1; }; static void dsync_mailbox_import_search_init(struct dsync_mailbox_importer *importer) { struct mail_search_args *search_args; struct mail_search_arg *sarg; search_args = mail_search_build_init(); sarg = mail_search_build_add(search_args, SEARCH_UIDSET); p_array_init(&sarg->value.seqset, search_args->pool, 128); seq_range_array_add_range(&sarg->value.seqset, importer->last_common_uid+1, (uint32_t)-1); importer->search_ctx = mailbox_search_init(importer->trans, search_args, NULL, 0, NULL); mail_search_args_unref(&search_args); if (mailbox_search_next(importer->search_ctx, &importer->cur_mail)) importer->next_local_seq = importer->cur_mail->seq; /* this flag causes cur_guid to be looked up later */ importer->cur_mail_saved = TRUE; } struct dsync_mailbox_importer * dsync_mailbox_import_init(struct mailbox *box, struct dsync_transaction_log_scan *log_scan, uint32_t last_common_uid, uint64_t last_common_modseq, uint32_t remote_uid_next, uint32_t remote_first_recent_uid, uint64_t remote_highest_modseq, enum dsync_mailbox_import_flags flags) { const enum mailbox_transaction_flags ext_trans_flags = MAILBOX_TRANSACTION_FLAG_SYNC | MAILBOX_TRANSACTION_FLAG_EXTERNAL | MAILBOX_TRANSACTION_FLAG_ASSIGN_UIDS; struct dsync_mailbox_importer *importer; struct mailbox_status status; pool_t pool; pool = pool_alloconly_create(MEMPOOL_GROWING"dsync mailbox importer", 10240); importer = p_new(pool, struct dsync_mailbox_importer, 1); importer->pool = pool; importer->box = box; importer->last_common_uid = last_common_uid; importer->last_common_modseq = last_common_modseq; importer->last_common_uid_found = last_common_uid != 0 || last_common_modseq != 0; importer->remote_uid_next = remote_uid_next; importer->remote_first_recent_uid = remote_first_recent_uid; importer->remote_highest_modseq = remote_highest_modseq; importer->import_guids = hash_table_create(default_pool, pool, 0, str_hash, (hash_cmp_callback_t *)strcmp); importer->import_uids = hash_table_create(default_pool, pool, 0, NULL, NULL); i_array_init(&importer->maybe_expunge_uids, 16); i_array_init(&importer->maybe_saves, 128); i_array_init(&importer->newmails, 128); i_array_init(&importer->wanted_uids, 128); importer->trans = mailbox_transaction_begin(importer->box, MAILBOX_TRANSACTION_FLAG_SYNC); importer->ext_trans = mailbox_transaction_begin(box, ext_trans_flags); importer->mail = mail_alloc(importer->trans, 0, NULL); importer->ext_mail = mail_alloc(importer->ext_trans, 0, NULL); if ((flags & DSYNC_MAILBOX_IMPORT_FLAG_WANT_MAIL_REQUESTS) != 0) { i_array_init(&importer->mail_requests, 128); importer->want_mail_requests = TRUE; } importer->mails_have_guids = (flags & DSYNC_MAILBOX_IMPORT_FLAG_MAILS_HAVE_GUIDS) != 0; importer->master_brain = (flags & DSYNC_MAILBOX_IMPORT_FLAG_MASTER_BRAIN) != 0; mailbox_get_open_status(importer->box, STATUS_UIDNEXT | STATUS_HIGHESTMODSEQ, &status); importer->local_uid_next = status.uidnext; importer->local_initial_highestmodseq = status.highest_modseq; dsync_mailbox_import_search_init(importer); importer->local_changes = dsync_transaction_log_scan_get_hash(log_scan); return importer; } static void dsync_mail_error(struct dsync_mailbox_importer *importer, struct mail *mail, const char *field) { const char *errstr; enum mail_error error; errstr = mailbox_get_last_error(importer->box, &error); if (error == MAIL_ERROR_EXPUNGED) return; i_error("Can't lookup %s for UID=%u: %s", field, mail->uid, errstr); importer->failed = TRUE; } static bool importer_next_mail(struct dsync_mailbox_importer *importer, uint32_t wanted_uid) { if (importer->cur_mail == NULL) { /* end of search */ return FALSE; } while (importer->cur_mail->seq < importer->next_local_seq || importer->cur_mail->uid < wanted_uid) { if (!importer->cur_uid_has_change && !importer->last_common_uid_found) { /* this message exists locally, but remote didn't send expunge-change for it. if the message's uid <= last-common-uid, it should be deleted */ seq_range_array_add(&importer->maybe_expunge_uids, importer->cur_mail->uid); } importer->cur_mail_saved = FALSE; if (!mailbox_search_next(importer->search_ctx, &importer->cur_mail)) { importer->cur_mail = NULL; importer->cur_guid = NULL; return FALSE; } importer->cur_uid_has_change = FALSE; } importer->cur_uid_has_change = importer->cur_mail != NULL && importer->cur_mail->uid == wanted_uid; if (mail_get_special(importer->cur_mail, MAIL_FETCH_GUID, &importer->cur_guid) < 0) { dsync_mail_error(importer, importer->cur_mail, "GUID"); return importer_next_mail(importer, wanted_uid); } /* make sure next_local_seq gets updated in case we came here because of min_uid */ importer->next_local_seq = importer->cur_mail->seq; return TRUE; } static int importer_mail_cmp(const struct importer_mail *m1, const struct importer_mail *m2) { int ret; if (m1->guid == NULL) return 1; if (m2->guid == NULL) return -1; ret = strcmp(m1->guid, m2->guid); if (ret != 0) return ret; if (m1->uid < m2->uid) return -1; if (m1->uid > m2->uid) return 1; return 0; } static void importer_mail_request(struct dsync_mailbox_importer *importer, struct importer_new_mail *newmail) { struct dsync_mail_request *request; if (importer->want_mail_requests && !newmail->uid_in_local) { request = array_append_space(&importer->mail_requests); request->guid = newmail->guid; request->uid = newmail->uid; } } static void newmail_link(struct dsync_mailbox_importer *importer, struct importer_new_mail *newmail) { struct importer_new_mail *first_mail, **last, *mail, *link = NULL; if (*newmail->guid != '\0') { first_mail = hash_table_lookup(importer->import_guids, newmail->guid); if (first_mail == NULL) { /* first mail for this GUID */ hash_table_insert(importer->import_guids, (void *)newmail->guid, newmail); importer_mail_request(importer, newmail); return; } } else { if (!newmail->uid_in_local) { /* FIXME: ? */ return; } first_mail = hash_table_lookup(importer->import_uids, POINTER_CAST(newmail->uid)); if (first_mail == NULL) { /* first mail for this UID */ hash_table_insert(importer->import_uids, POINTER_CAST(newmail->uid), newmail); importer_mail_request(importer, newmail); return; } } /* 1) add the newmail to the end of the linked list 2) find our link */ last = &first_mail->next; for (mail = first_mail; mail != NULL; mail = mail->next) { if (mail->uid == newmail->uid) mail->uid_is_usable = TRUE; if (link == NULL && mail->link == NULL && mail->uid_in_local != newmail->uid_in_local) link = mail; last = &mail->next; } *last = newmail; if (link != NULL && newmail->link == NULL) { link->link = newmail; newmail->link = link; } } static bool dsync_mailbox_try_save_cur(struct dsync_mailbox_importer *importer, struct dsync_mail_change *save_change) { struct importer_mail m1, m2; struct importer_new_mail *newmail; int diff; bool remote_saved; memset(&m1, 0, sizeof(m1)); if (importer->cur_mail != NULL) { m1.guid = importer->cur_guid; m1.uid = importer->cur_mail->uid; } memset(&m2, 0, sizeof(m2)); if (save_change != NULL) { m2.guid = save_change->guid; m2.uid = save_change->uid; } newmail = p_new(importer->pool, struct importer_new_mail, 1); diff = importer_mail_cmp(&m1, &m2); if (diff < 0) { /* add a record for local mail */ i_assert(importer->cur_mail != NULL); newmail->guid = p_strdup(importer->pool, importer->cur_guid); newmail->uid = importer->cur_mail->uid; newmail->uid_in_local = TRUE; newmail->uid_is_usable = newmail->uid >= importer->remote_uid_next; remote_saved = FALSE; } else if (diff > 0) { i_assert(save_change != NULL); newmail->guid = save_change->guid; newmail->uid = save_change->uid; newmail->uid_in_local = FALSE; newmail->uid_is_usable = newmail->uid >= importer->local_uid_next; remote_saved = TRUE; } else { /* identical */ i_assert(importer->cur_mail != NULL); i_assert(save_change != NULL); newmail->guid = save_change->guid; newmail->uid = importer->cur_mail->uid; newmail->uid_in_local = TRUE; newmail->uid_is_usable = TRUE; newmail->link = newmail; remote_saved = TRUE; } if (newmail->uid_in_local) { importer->cur_mail_saved = TRUE; importer->next_local_seq++; } else { /* NOTE: assumes save_change is allocated from importer pool */ newmail->change = save_change; } array_append(&importer->newmails, &newmail, 1); newmail_link(importer, newmail); return remote_saved; } static bool ATTR_NULL(2) dsync_mailbox_try_save(struct dsync_mailbox_importer *importer, struct dsync_mail_change *save_change) { if (importer->cur_mail_saved) { if (!importer_next_mail(importer, 0) && save_change == NULL) return FALSE; } return dsync_mailbox_try_save_cur(importer, save_change); } static void dsync_mailbox_save(struct dsync_mailbox_importer *importer, struct dsync_mail_change *save_change) { while (!dsync_mailbox_try_save(importer, save_change)) ; } static bool dsync_import_set_mail(struct dsync_mailbox_importer *importer, const struct dsync_mail_change *change) { const char *guid; if (!mail_set_uid(importer->mail, change->uid)) return FALSE; if (change->guid == NULL) { /* GUID is unknown */ return TRUE; } if (*change->guid == '\0') { /* backend doesn't support GUIDs. if hdr_hash is set, we could verify it, but since this message really is supposed to match, it's probably too much trouble. */ return TRUE; } /* verify that GUID matches, just in case */ if (mail_get_special(importer->mail, MAIL_FETCH_GUID, &guid) < 0) { dsync_mail_error(importer, importer->mail, "GUID"); return FALSE; } if (strcmp(guid, change->guid) != 0) { i_error("Mailbox %s: Unexpected GUID mismatch for " "UID=%u: %s != %s", mailbox_get_vname(importer->box), change->uid, guid, change->guid); importer->last_common_uid = 1; importer->failed = TRUE; return FALSE; } return TRUE; } static void merge_flags(uint32_t local_final, uint32_t local_add, uint32_t local_remove, uint32_t remote_final, uint32_t remote_add, uint32_t remote_remove, bool prefer_remote, uint32_t *change_add_r, uint32_t *change_remove_r) { uint32_t combined_add, combined_remove, conflict_flags; uint32_t local_wanted, remote_wanted; /* resolve conflicts */ conflict_flags = local_add & remote_remove; if (conflict_flags != 0) { if (prefer_remote) local_add &= ~conflict_flags; else remote_remove &= ~conflict_flags; } conflict_flags = local_remove & remote_add; if (conflict_flags != 0) { if (prefer_remote) local_remove &= ~conflict_flags; else remote_add &= ~conflict_flags; } combined_add = local_add|remote_add; combined_remove = local_remove|remote_remove; i_assert((combined_add & combined_remove) == 0); /* see if there are conflicting final flags */ local_wanted = (local_final|combined_add) & ~combined_remove; remote_wanted = (remote_final|combined_add) & ~combined_remove; conflict_flags = local_wanted ^ remote_wanted; if (conflict_flags != 0) { if (prefer_remote) local_wanted = remote_wanted; /*else remote_wanted = local_wanted;*/ } *change_add_r = local_wanted & ~local_final; *change_remove_r = local_final & ~local_wanted; } static bool keyword_find(ARRAY_TYPE(const_string) *keywords, const char *name, unsigned int *idx_r) { const char *const *names; unsigned int i, count; names = array_get(keywords, &count); for (i = 0; i < count; i++) { if (strcmp(names[i], name) == 0) { *idx_r = i; return TRUE; } } return FALSE; } static void keywords_append(ARRAY_TYPE(const_string) *dest, const ARRAY_TYPE(const_string) *keywords, uint32_t bits, unsigned int start_idx) { const char *const *namep; unsigned int i; for (i = 0; i < 32; i++) { if ((bits & (1U << i)) == 0) continue; namep = array_idx(keywords, start_idx+i); array_append(dest, namep, 1); } } static void merge_keywords(struct mail *mail, const ARRAY_TYPE(const_string) *local_changes, const ARRAY_TYPE(const_string) *remote_changes, bool prefer_remote) { /* local_changes and remote_changes are assumed to have no duplicates names */ uint32_t *local_add, *local_remove, *local_final; uint32_t *remote_add, *remote_remove, *remote_final; uint32_t *change_add, *change_remove; ARRAY_TYPE(const_string) all_keywords, add_keywords, remove_keywords; const char *const *changes, *name, *const *local_keywords; struct mail_keywords *kw; unsigned int i, count, name_idx, array_size; local_keywords = mail_get_keywords(mail); /* we'll assign a common index for each keyword name and place the changes to separate bit arrays. */ if (array_is_created(remote_changes)) changes = array_get(remote_changes, &count); else { changes = NULL; count = 0; } array_size = str_array_length(local_keywords) + count; if (array_is_created(local_changes)) array_size += array_count(local_changes); if (array_size == 0) { /* this message has no keywords */ return; } t_array_init(&all_keywords, array_size); t_array_init(&add_keywords, array_size); t_array_init(&remove_keywords, array_size); /* @UNSAFE: create large enough arrays to fit all keyword indexes. */ array_size = (array_size+31)/32; local_add = t_new(uint32_t, array_size); local_remove = t_new(uint32_t, array_size); local_final = t_new(uint32_t, array_size); remote_add = t_new(uint32_t, array_size); remote_remove = t_new(uint32_t, array_size); remote_final = t_new(uint32_t, array_size); change_add = t_new(uint32_t, array_size); change_remove = t_new(uint32_t, array_size); /* get remote changes */ for (i = 0; i < count; i++) { name = changes[i]+1; name_idx = array_count(&all_keywords); array_append(&all_keywords, &name, 1); switch (changes[i][0]) { case KEYWORD_CHANGE_ADD: remote_add[name_idx/32] |= 1U << (name_idx%32); /* fall through */ case KEYWORD_CHANGE_FINAL: remote_final[name_idx/32] |= 1U << (name_idx%32); break; case KEYWORD_CHANGE_REMOVE: remote_remove[name_idx/32] |= 1U << (name_idx%32); break; } } /* get local changes. use existing indexes for names when they exist. */ if (array_is_created(local_changes)) changes = array_get(local_changes, &count); else { changes = NULL; count = 0; } for (i = 0; i < count; i++) { name = changes[i]+1; if (!keyword_find(&all_keywords, name, &name_idx)) { name_idx = array_count(&all_keywords); array_append(&all_keywords, &name, 1); } switch (changes[i][0]) { case KEYWORD_CHANGE_ADD: local_add[name_idx/32] |= 1U << (name_idx%32); break; case KEYWORD_CHANGE_REMOVE: local_remove[name_idx/32] |= 1U << (name_idx%32); break; case KEYWORD_CHANGE_FINAL: i_unreached(); } } for (i = 0; local_keywords[i] != NULL; i++) { name = local_keywords[i]; if (!keyword_find(&all_keywords, name, &name_idx)) { name_idx = array_count(&all_keywords); array_append(&all_keywords, &name, 1); } local_final[name_idx/32] |= 1U << (name_idx%32); } i_assert(array_count(&all_keywords) <= array_size*32); array_size = (array_count(&all_keywords)+31) / 32; /* merge keywords */ for (i = 0; i < array_size; i++) { merge_flags(local_final[i], local_add[i], local_remove[i], remote_final[i], remote_add[i], remote_remove[i], prefer_remote, &change_add[i], &change_remove[i]); if (change_add[i] != 0) { keywords_append(&add_keywords, &all_keywords, change_add[i], i*32); } if (change_remove[i] != 0) { keywords_append(&remove_keywords, &all_keywords, change_add[i], i*32); } } /* apply changes */ if (array_count(&add_keywords) > 0) { (void)array_append_space(&add_keywords); kw = mailbox_keywords_create_valid(mail->box, array_idx(&add_keywords, 0)); mail_update_keywords(mail, MODIFY_ADD, kw); mailbox_keywords_unref(&kw); } if (array_count(&remove_keywords) > 0) { (void)array_append_space(&remove_keywords); kw = mailbox_keywords_create_valid(mail->box, array_idx(&remove_keywords, 0)); mail_update_keywords(mail, MODIFY_REMOVE, kw); mailbox_keywords_unref(&kw); } } static void dsync_mailbox_import_flag_change(struct dsync_mailbox_importer *importer, const struct dsync_mail_change *change) { const struct dsync_mail_change *local_change; enum mail_flags local_add, local_remove; uint32_t change_add, change_remove; ARRAY_TYPE(const_string) local_keyword_changes = ARRAY_INIT; struct mail *mail; bool prefer_remote; i_assert((change->add_flags & change->remove_flags) == 0); if (importer->cur_mail != NULL && importer->cur_mail->uid == change->uid) mail = importer->cur_mail; else { if (!dsync_import_set_mail(importer, change)) return; mail = importer->mail; } local_change = hash_table_lookup(importer->local_changes, POINTER_CAST(change->uid)); if (local_change == NULL) { local_add = local_remove = 0; } else { local_add = local_change->add_flags; local_remove = local_change->remove_flags; local_keyword_changes = local_change->keyword_changes; } if (mail_get_modseq(mail) < change->modseq) prefer_remote = TRUE; else if (mail_get_modseq(mail) > change->modseq) prefer_remote = FALSE; else { /* identical modseq, we'll just have to pick one. Note that both brains need to pick the same one, otherwise they become unsynced. */ prefer_remote = !importer->master_brain; } /* merge flags */ merge_flags(mail_get_flags(mail), local_add, local_remove, change->final_flags, change->add_flags, change->remove_flags, prefer_remote, &change_add, &change_remove); if (change_add != 0) mail_update_flags(mail, MODIFY_ADD, change_add); if (change_remove != 0) mail_update_flags(mail, MODIFY_REMOVE, change_remove); /* merge keywords */ merge_keywords(mail, &local_keyword_changes, &change->keyword_changes, prefer_remote); mail_update_modseq(mail, change->modseq); } static void dsync_mailbox_import_save(struct dsync_mailbox_importer *importer, const struct dsync_mail_change *change) { struct dsync_mail_change *save; i_assert(change->guid != NULL); if (change->uid == importer->last_common_uid) { /* we've already verified that the GUID matches. apply flag changes if there are any. */ i_assert(!importer->last_common_uid_found); dsync_mailbox_import_flag_change(importer, change); return; } save = p_new(importer->pool, struct dsync_mail_change, 1); dsync_mail_change_dup(importer->pool, change, save); if (importer->last_common_uid_found) { /* this is a new mail. its UID may or may not conflict with an existing local mail, we'll figure it out later. */ i_assert(change->uid > importer->last_common_uid); dsync_mailbox_save(importer, save); } else { /* the local mail is expunged. we'll decide later if we want to save this mail locally or expunge it form remote. */ i_assert(change->uid > importer->last_common_uid); i_assert(change->uid < importer->cur_mail->uid); array_append(&importer->maybe_saves, &save, 1); } } static void dsync_mailbox_import_expunge(struct dsync_mailbox_importer *importer, const struct dsync_mail_change *change) { if (importer->last_common_uid_found) { /* expunge the message, unless its GUID unexpectedly doesn't match */ i_assert(change->uid <= importer->last_common_uid); if (dsync_import_set_mail(importer, change)) mail_expunge(importer->mail); } else if (change->uid < importer->cur_mail->uid) { /* already expunged locally, we can ignore this. uid=last_common_uid if we managed to verify from transaction log that the GUIDs match */ i_assert(change->uid >= importer->last_common_uid); } else if (change->uid == importer->last_common_uid) { /* already verified that the GUID matches */ i_assert(importer->cur_mail->uid == change->uid); mail_expunge(importer->cur_mail); } else { /* we don't know yet if we should expunge this message or not. queue it until we do. */ i_assert(change->uid > importer->last_common_uid); seq_range_array_add(&importer->maybe_expunge_uids, change->uid); } } static void dsync_mailbox_rewind_search(struct dsync_mailbox_importer *importer) { /* If there are local mails after last_common_uid which we skipped while trying to match the next message, we need to now go back */ if (importer->cur_mail != NULL && importer->cur_mail->uid <= importer->last_common_uid+1) return; importer->cur_mail = NULL; importer->cur_guid = NULL; importer->next_local_seq = 0; (void)mailbox_search_deinit(&importer->search_ctx); dsync_mailbox_import_search_init(importer); } static void dsync_mailbox_common_uid_found(struct dsync_mailbox_importer *importer) { struct dsync_mail_change *const *saves; struct seq_range_iter iter; unsigned int n, i, count; uint32_t uid; importer->last_common_uid_found = TRUE; dsync_mailbox_rewind_search(importer); /* expunge the messages whose expunge-decision we delayed previously */ seq_range_array_iter_init(&iter, &importer->maybe_expunge_uids); n = 0; while (seq_range_array_iter_nth(&iter, n++, &uid)) { if (uid > importer->last_common_uid) { /* we expunge messages only up to last_common_uid, ignore the rest */ break; } if (mail_set_uid(importer->mail, uid)) mail_expunge(importer->mail); } /* handle pending saves */ saves = array_get(&importer->maybe_saves, &count); for (i = 0; i < count; i++) { if (saves[i]->uid > importer->last_common_uid) dsync_mailbox_save(importer, saves[i]); } } static int dsync_mailbox_import_match_msg(struct dsync_mailbox_importer *importer, const struct dsync_mail_change *change) { const char *hdr_hash; if (*change->guid != '\0' && *importer->cur_guid != '\0') { /* we have GUIDs, verify them */ return strcmp(change->guid, importer->cur_guid) == 0 ? 1 : 0; } /* verify hdr_hash if it exists */ if (change->hdr_hash == NULL) { i_assert(*importer->cur_guid == '\0'); i_error("Mailbox %s: GUIDs not supported, " "sync with header hashes instead", mailbox_get_vname(importer->box)); importer->failed = TRUE; return -1; } if (dsync_mail_get_hdr_hash(importer->cur_mail, &hdr_hash) < 0) { dsync_mail_error(importer, importer->cur_mail, "hdr-stream"); return -1; } return strcmp(change->hdr_hash, hdr_hash) == 0 ? 1 : 0; } static void dsync_mailbox_find_common_uid(struct dsync_mailbox_importer *importer, const struct dsync_mail_change *change) { const struct dsync_mail_change *local_change; guid_128_t guid_128, change_guid_128; int ret; /* try to find the matching local mail */ if (!importer_next_mail(importer, change->uid)) { /* no more local mails. use the last message with a matching GUID as the last common UID. */ dsync_mailbox_common_uid_found(importer); return; } if (change->guid == NULL) { /* we can't know if this UID matches */ return; } if (importer->cur_mail->uid == change->uid) { /* we have a matching local UID. check GUID to see if it's really the same mail or not */ if ((ret = dsync_mailbox_import_match_msg(importer, change)) < 0) { /* unknown */ return; } if (ret == 0) { /* mismatch - found the first non-common UID */ dsync_mailbox_common_uid_found(importer); } else { importer->last_common_uid = change->uid; } return; } if (*change->guid == '\0') { /* remote doesn't support GUIDs, can't verify expunge */ return; } /* local message is expunged. see if we can find its GUID from transaction log and check if the GUIDs match. The GUID in log is a 128bit GUID, so we may need to convert the remote's GUID string to 128bit GUID first. */ local_change = hash_table_lookup(importer->local_changes, POINTER_CAST(change->uid)); if (local_change == NULL || local_change->guid == NULL) return; if (guid_128_from_string(local_change->guid, guid_128) < 0) i_unreached(); mail_generate_guid_128_hash(change->guid, change_guid_128); if (memcmp(change_guid_128, guid_128, GUID_128_SIZE) != 0) { /* mismatch - found the first non-common UID */ dsync_mailbox_common_uid_found(importer); } else { importer->last_common_uid = change->uid; } return; } void dsync_mailbox_import_change(struct dsync_mailbox_importer *importer, const struct dsync_mail_change *change) { i_assert(!importer->new_uids_assigned); i_assert(importer->prev_uid < change->uid); importer->prev_uid = change->uid; if (!importer->last_common_uid_found) dsync_mailbox_find_common_uid(importer, change); if (importer->last_common_uid_found) { /* a) uid <= last_common_uid for flag changes and expunges. this happens only when last_common_uid was originally given as parameter to importer. when we're finding the last_common_uid ourself, uid>last_common_uid always in here, because last_common_uid_found=TRUE only after we find the first mismatch. b) uid > last_common_uid for i) new messages, ii) expunges that were sent "just in case" */ if (change->uid <= importer->last_common_uid) { i_assert(change->type != DSYNC_MAIL_CHANGE_TYPE_SAVE); } else if (change->type == DSYNC_MAIL_CHANGE_TYPE_EXPUNGE) { /* ignore */ return; } else { i_assert(change->type == DSYNC_MAIL_CHANGE_TYPE_SAVE); } } else { /* a) uid < last_common_uid can never happen */ i_assert(change->uid >= importer->last_common_uid); /* b) uid = last_common_uid if we've verified that the messages' GUIDs match so far. c) uid > last_common_uid: i) TYPE_EXPUNGE change has GUID=NULL, so we couldn't verify yet if it matches our local message, ii) local message is expunged and we couldn't find its GUID */ if (change->uid > importer->last_common_uid) { i_assert(change->type == DSYNC_MAIL_CHANGE_TYPE_EXPUNGE || change->uid < importer->cur_mail->uid); } } switch (change->type) { case DSYNC_MAIL_CHANGE_TYPE_SAVE: dsync_mailbox_import_save(importer, change); break; case DSYNC_MAIL_CHANGE_TYPE_EXPUNGE: dsync_mailbox_import_expunge(importer, change); break; case DSYNC_MAIL_CHANGE_TYPE_FLAG_CHANGE: i_assert(importer->last_common_uid_found); dsync_mailbox_import_flag_change(importer, change); break; } } static void dsync_msg_update_uid(struct dsync_mailbox_importer *importer, uint32_t old_uid, uint32_t new_uid) { struct mail_save_context *save_ctx; if (!mail_set_uid(importer->mail, old_uid)) return; save_ctx = mailbox_save_alloc(importer->ext_trans); mailbox_save_copy_flags(save_ctx, importer->mail); mailbox_save_set_uid(save_ctx, new_uid); if (mailbox_copy(&save_ctx, importer->mail) == 0) { array_append(&importer->wanted_uids, &new_uid, 1); mail_expunge(importer->mail); } } static void dsync_mailbox_import_assign_new_uids(struct dsync_mailbox_importer *importer) { struct importer_new_mail *newmail, *const *newmailp; uint32_t common_uid_next, new_uid; common_uid_next = I_MAX(importer->local_uid_next, importer->remote_uid_next); array_foreach_modifiable(&importer->newmails, newmailp) { newmail = *newmailp; if (newmail->skip) { /* already assigned */ if (newmail->uid_in_local) { if (mail_set_uid(importer->mail, newmail->uid)) mail_expunge(importer->mail); } continue; } /* figure out what UID to use for the mail */ if (newmail->uid_is_usable) { /* keep the UID */ new_uid = newmail->uid; } else if (newmail->link != NULL && newmail->link->uid_is_usable) new_uid = newmail->link->uid; else new_uid = common_uid_next++; if (newmail->uid_in_local && newmail->uid != new_uid) { /* local UID changed, reassign it by copying */ dsync_msg_update_uid(importer, newmail->uid, new_uid); } newmail->uid = new_uid; if (newmail->link != NULL) { /* skip the linked mail */ newmail->link->skip = TRUE; } } importer->last_common_uid = common_uid_next; importer->new_uids_assigned = TRUE; } void dsync_mailbox_import_changes_finish(struct dsync_mailbox_importer *importer) { i_assert(!importer->new_uids_assigned); if (!importer->last_common_uid_found) { /* handle pending expunges and flag updates */ dsync_mailbox_common_uid_found(importer); } /* skip common local mails */ (void)importer_next_mail(importer, importer->last_common_uid+1); /* if there are any local mails left, add them to newmails list */ while (importer->cur_mail != NULL) (void)dsync_mailbox_try_save(importer, NULL); dsync_mailbox_import_assign_new_uids(importer); } const struct dsync_mail_request * dsync_mailbox_import_next_request(struct dsync_mailbox_importer *importer) { const struct dsync_mail_request *requests; unsigned int count; requests = array_get(&importer->mail_requests, &count); if (importer->mail_request_idx == count) return NULL; return &requests[importer->mail_request_idx++]; } static const char *const * dsync_mailbox_get_final_keywords(const struct dsync_mail_change *change) { ARRAY_TYPE(const_string) keywords; const char *const *changes; unsigned int i, count; if (!array_is_created(&change->keyword_changes)) return NULL; changes = array_get(&change->keyword_changes, &count); t_array_init(&keywords, count); for (i = 0; i < count; i++) { if (changes[i][0] == KEYWORD_CHANGE_ADD || changes[i][0] == KEYWORD_CHANGE_FINAL) { const char *name = changes[i]+1; array_append(&keywords, &name, 1); } } if (array_count(&keywords) == 0) return NULL; (void)array_append_space(&keywords); return array_idx(&keywords, 0); } static void dsync_mailbox_save_set_metadata(struct dsync_mailbox_importer *importer, struct mail_save_context *save_ctx, const struct dsync_mail_change *change) { const char *const *keyword_names; struct mail_keywords *keywords; keyword_names = dsync_mailbox_get_final_keywords(change); keywords = keyword_names == NULL ? NULL : mailbox_keywords_create_valid(importer->box, keyword_names); mailbox_save_set_flags(save_ctx, change->final_flags, keywords); if (keywords != NULL) mailbox_keywords_unref(&keywords); mailbox_save_set_save_date(save_ctx, change->save_timestamp); if (change->modseq > 1) { (void)mailbox_enable(importer->box, MAILBOX_FEATURE_CONDSTORE); mailbox_save_set_min_modseq(save_ctx, change->modseq); } } static int dsync_msg_try_copy(struct dsync_mailbox_importer *importer, struct mail_save_context **save_ctx_p, struct importer_new_mail *all_newmails) { struct importer_new_mail *inst; for (inst = all_newmails; inst != NULL; inst = inst->next) { if (inst->uid_in_local && !inst->copy_failed && mail_set_uid(importer->mail, inst->uid)) { if (mailbox_copy(save_ctx_p, importer->mail) < 0) { inst->copy_failed = TRUE; return -1; } return 1; } } return 0; } static struct mail_save_context * dsync_mailbox_save_init(struct dsync_mailbox_importer *importer, const struct dsync_mail *mail, struct importer_new_mail *newmail) { struct mail_save_context *save_ctx; save_ctx = mailbox_save_alloc(importer->ext_trans); mailbox_save_set_uid(save_ctx, newmail->uid); if (*mail->guid != '\0') mailbox_save_set_guid(save_ctx, mail->guid); dsync_mailbox_save_set_metadata(importer, save_ctx, newmail->change); if (*mail->pop3_uidl != '\0') mailbox_save_set_pop3_uidl(save_ctx, mail->pop3_uidl); if (mail->pop3_order > 0) mailbox_save_set_pop3_order(save_ctx, mail->pop3_order); mailbox_save_set_received_date(save_ctx, mail->received_date, 0); return save_ctx; } static void dsync_mailbox_save_body(struct dsync_mailbox_importer *importer, const struct dsync_mail *mail, struct importer_new_mail *newmail, struct importer_new_mail *all_newmails) { struct mail_save_context *save_ctx; ssize_t ret; bool save_failed = FALSE; /* try to save the mail by copying an existing mail */ save_ctx = dsync_mailbox_save_init(importer, mail, newmail); if ((ret = dsync_msg_try_copy(importer, &save_ctx, all_newmails)) < 0) { if (save_ctx == NULL) save_ctx = dsync_mailbox_save_init(importer, mail, newmail); } if (ret > 0) { array_append(&importer->wanted_uids, &newmail->uid, 1); return; } /* fallback to saving from remote stream */ if (mail->input == NULL) { /* it was just expunged in remote, skip it */ mailbox_save_cancel(&save_ctx); return; } i_stream_seek(mail->input, 0); if (mailbox_save_begin(&save_ctx, mail->input) < 0) { i_error("Can't save message to mailbox %s: %s", mailbox_get_vname(importer->box), mailbox_get_last_error(importer->box, NULL)); importer->failed = TRUE; return; } while ((ret = i_stream_read(mail->input)) > 0 || ret == -2) { if (mailbox_save_continue(save_ctx) < 0) { save_failed = TRUE; ret = -1; break; } } i_assert(ret == -1); if (mail->input->stream_errno != 0) { errno = mail->input->stream_errno; i_error("read(msg input) failed: %m"); mailbox_save_cancel(&save_ctx); importer->failed = TRUE; } else if (save_failed) { mailbox_save_cancel(&save_ctx); importer->failed = TRUE; } else { i_assert(mail->input->eof); if (mailbox_save_finish(&save_ctx) < 0) { i_error("Can't save message to mailbox %s: %s", mailbox_get_vname(importer->box), mailbox_get_last_error(importer->box, NULL)); importer->failed = TRUE; } else { array_append(&importer->wanted_uids, &newmail->uid, 1); } } } void dsync_mailbox_import_mail(struct dsync_mailbox_importer *importer, const struct dsync_mail *mail) { struct importer_new_mail *newmail, *allmails; i_assert(mail->input->seekable); i_assert(importer->new_uids_assigned); newmail = *mail->guid != '\0' ? hash_table_lookup(importer->import_guids, mail->guid) : hash_table_lookup(importer->import_uids, POINTER_CAST(mail->uid)); if (newmail == NULL) { if (importer->want_mail_requests) { i_error("%s: Remote sent unwanted message body for " "GUID=%s UID=%u", mailbox_get_vname(importer->box), mail->guid, mail->uid); } return; } if (*mail->guid != '\0') hash_table_remove(importer->import_guids, mail->guid); else { hash_table_remove(importer->import_uids, POINTER_CAST(mail->uid)); } /* save all instances of the message */ allmails = newmail; for (; newmail != NULL; newmail = newmail->next) { if (newmail->skip) { /* no need to do anything for this mail */ continue; } if (newmail->uid_in_local) { /* we already handled this by copying the mail */ continue; } T_BEGIN { dsync_mailbox_save_body(importer, mail, newmail, allmails); } T_END; } } static int reassign_uids_in_seq_range(struct mailbox *box, uint32_t seq1, uint32_t seq2) { const enum mailbox_transaction_flags trans_flags = MAILBOX_TRANSACTION_FLAG_EXTERNAL | MAILBOX_TRANSACTION_FLAG_ASSIGN_UIDS; struct mailbox_transaction_context *trans; struct mail_save_context *save_ctx; struct mail *mail; uint32_t seq; int ret = 0; trans = mailbox_transaction_begin(box, trans_flags); mail = mail_alloc(trans, 0, NULL); for (seq = seq1; seq <= seq2; seq++) { mail_set_seq(mail, seq); save_ctx = mailbox_save_alloc(trans); mailbox_save_copy_flags(save_ctx, mail); if (mailbox_copy(&save_ctx, mail) < 0) ret = -1; else mail_expunge(mail); } mail_free(&mail); if (mailbox_transaction_commit(&trans) < 0) { i_error("UID reassign commit failed to mailbox %s: %s", mailbox_get_vname(box), mailbox_get_last_error(box, NULL)); ret = -1; } return ret; } static bool reassign_unwanted_uids(struct dsync_mailbox_importer *importer, const struct mail_transaction_commit_changes *changes, bool *changes_during_sync_r) { struct seq_range_iter iter; const uint32_t *wanted_uids; uint32_t saved_uid, highest_unwanted_uid = 0; uint32_t seq1, seq2, lowest_saved_uid = (uint32_t)-1; unsigned int i, n, wanted_count; int ret = 0; /* find the highest wanted UID that doesn't match what we got */ wanted_uids = array_get(&importer->wanted_uids, &wanted_count); seq_range_array_iter_init(&iter, &changes->saved_uids); i = n = 0; while (seq_range_array_iter_nth(&iter, n++, &saved_uid)) { i_assert(i < wanted_count); if (lowest_saved_uid > saved_uid) lowest_saved_uid = saved_uid; if (saved_uid != wanted_uids[i]) { if (highest_unwanted_uid < wanted_uids[i]) highest_unwanted_uid = wanted_uids[i]; } i++; } if (highest_unwanted_uid == 0 && i > 0 && importer->local_uid_next <= lowest_saved_uid-1) { /* we didn't see any unwanted UIDs, but we'll still need to verify that messages didn't just get saved locally to a gap that we left in local_uid_next..(lowest_saved_uid-1) */ highest_unwanted_uid = lowest_saved_uid-1; } if (highest_unwanted_uid == 0) seq1 = seq2 = 0; else { mailbox_get_seq_range(importer->box, importer->local_uid_next, highest_unwanted_uid, &seq1, &seq2); } if (seq1 > 0) { ret = reassign_uids_in_seq_range(importer->box, seq1, seq2); *changes_during_sync_r = TRUE; } return ret; } static int dsync_mailbox_import_commit(struct dsync_mailbox_importer *importer, bool *changes_during_sync_r) { struct mail_transaction_commit_changes changes; struct mailbox_update update; int ret = 0; /* commit saves */ if (mailbox_transaction_commit_get_changes(&importer->ext_trans, &changes) < 0) { i_error("Save commit failed to mailbox %s: %s", mailbox_get_vname(importer->box), mailbox_get_last_error(importer->box, NULL)); mailbox_transaction_rollback(&importer->trans); return -1; } /* commit flag changes and expunges */ if (mailbox_transaction_commit(&importer->trans) < 0) { i_error("Commit failed to mailbox %s: %s", mailbox_get_vname(importer->box), mailbox_get_last_error(importer->box, NULL)); pool_unref(&changes.pool); return -1; } /* update mailbox metadata. */ memset(&update, 0, sizeof(update)); update.min_next_uid = importer->remote_uid_next; update.min_first_recent_uid = I_MIN(importer->last_common_uid+1, importer->remote_first_recent_uid); update.min_highest_modseq = importer->remote_highest_modseq; if (mailbox_update(importer->box, &update) < 0) { i_error("Mailbox update failed to mailbox %s: %s", mailbox_get_vname(importer->box), mailbox_get_last_error(importer->box, NULL)); ret = -1; } /* sync mailbox to finish flag changes and expunges. */ if (mailbox_sync(importer->box, 0) < 0) { i_error("Mailbox sync failed to mailbox %s: %s", mailbox_get_vname(importer->box), mailbox_get_last_error(importer->box, NULL)); ret = -1; } if (reassign_unwanted_uids(importer, &changes, changes_during_sync_r) < 0) ret = -1; pool_unref(&changes.pool); return ret; } static unsigned int dsync_mailbox_import_count_missing_imports(struct hash_table *imports) { struct hash_iterate_context *iter; void *key, *value; unsigned int msgs_left = 0; iter = hash_table_iterate_init(imports); while (hash_table_iterate(iter, &key, &value)) { struct importer_new_mail *mail = value; for (; mail != NULL; mail = mail->next) { if (!mail->uid_in_local) { msgs_left++; break; } } } hash_table_iterate_deinit(&iter); return msgs_left; } int dsync_mailbox_import_deinit(struct dsync_mailbox_importer **_importer, uint32_t *last_common_uid_r, uint64_t *last_common_modseq_r, bool *changes_during_sync_r) { struct dsync_mailbox_importer *importer = *_importer; unsigned int msgs_left; int ret; *_importer = NULL; *changes_during_sync_r = FALSE; if (!importer->new_uids_assigned) dsync_mailbox_import_assign_new_uids(importer); msgs_left = dsync_mailbox_import_count_missing_imports(importer->import_guids) + dsync_mailbox_import_count_missing_imports(importer->import_uids); if (!importer->failed && msgs_left > 0) { i_error("%s: Remote didn't send %u expected message bodies", mailbox_get_vname(importer->box), msgs_left); } if (importer->search_ctx != NULL) { if (mailbox_search_deinit(&importer->search_ctx) < 0) importer->failed = TRUE; } mail_free(&importer->mail); mail_free(&importer->ext_mail); if (dsync_mailbox_import_commit(importer, changes_during_sync_r) < 0) importer->failed = TRUE; hash_table_destroy(&importer->import_guids); hash_table_destroy(&importer->import_uids); array_free(&importer->maybe_expunge_uids); array_free(&importer->maybe_saves); array_free(&importer->wanted_uids); array_free(&importer->newmails); if (array_is_created(&importer->mail_requests)) array_free(&importer->mail_requests); *last_common_uid_r = importer->last_common_uid; if (!*changes_during_sync_r) *last_common_modseq_r = importer->last_common_modseq; else { /* local changes occurred during dsync. we exported changes up to local_initial_highestmodseq, so all of the changes have happened after it. we want the next run to see those changes, so return it as the last common modseq */ *last_common_modseq_r = importer->local_initial_highestmodseq; } ret = importer->failed ? -1 : 0; pool_unref(&importer->pool); return ret; }