# HG changeset patch # User Timo Sirainen # Date 1493544708 -10800 # Node ID 81e013b3207da9670de06909883768e20910cc2d # Parent ba158fa10ff8ad97925ba97ea8fc473279e56a1f dsync: Try to commit transactions every dsync_commit_msgs_interval messages This was first attempted to be implemented by ec0cc8fa647794e44a1afaa448f495a713048dc4, but it was later partially reverted by 5973d496b16721af6d2c1fa90b016aacddf13554. This current commit should fix its problems. diff -r ba158fa10ff8 -r 81e013b3207d src/doveadm/doveadm-dsync.c --- a/src/doveadm/doveadm-dsync.c Fri May 12 12:44:27 2017 +0300 +++ b/src/doveadm/doveadm-dsync.c Sun Apr 30 12:31:48 2017 +0300 @@ -93,6 +93,7 @@ const char *error; unsigned int lock_timeout; + unsigned int import_commit_msgs_interval; unsigned int lock:1; unsigned int purge_remote:1; @@ -587,6 +588,7 @@ set.virtual_all_box = ctx->virtual_all_box; memcpy(set.sync_box_guid, ctx->mailbox_guid, sizeof(set.sync_box_guid)); set.lock_timeout_secs = ctx->lock_timeout; + set.import_commit_msgs_interval = ctx->import_commit_msgs_interval; set.state = ctx->state_input; set.mailbox_alt_char = doveadm_settings->dsync_alt_char[0]; @@ -1107,6 +1109,7 @@ p_array_init(&ctx->namespace_prefixes, ctx->ctx.pool, 4); if ((doveadm_settings->parsed_features & DSYNC_FEATURE_EMPTY_HDR_WORKAROUND) != 0) ctx->empty_hdr_workaround = TRUE; + ctx->import_commit_msgs_interval = doveadm_settings->dsync_commit_msgs_interval; return &ctx->ctx; } diff -r ba158fa10ff8 -r 81e013b3207d src/doveadm/doveadm-settings.c --- a/src/doveadm/doveadm-settings.c Fri May 12 12:44:27 2017 +0300 +++ b/src/doveadm/doveadm-settings.c Sun Apr 30 12:31:48 2017 +0300 @@ -72,6 +72,7 @@ DEF(SET_STR, director_username_hash), DEF(SET_STR, doveadm_api_key), DEF(SET_STR, dsync_features), + DEF(SET_UINT, dsync_commit_msgs_interval), DEF(SET_STR, doveadm_http_rawlog_dir), { SET_STRLIST, "plugin", offsetof(struct doveadm_settings, plugin_envs), NULL }, @@ -95,6 +96,7 @@ .dsync_alt_char = "_", .dsync_remote_cmd = "ssh -l%{login} %{host} doveadm dsync-server -u%u -U", .dsync_features = "", + .dsync_commit_msgs_interval = 100, .ssl_client_ca_dir = "", .ssl_client_ca_file = "", .director_username_hash = "%Lu", diff -r ba158fa10ff8 -r 81e013b3207d src/doveadm/doveadm-settings.h --- a/src/doveadm/doveadm-settings.h Fri May 12 12:44:27 2017 +0300 +++ b/src/doveadm/doveadm-settings.h Sun Apr 30 12:31:48 2017 +0300 @@ -29,6 +29,7 @@ const char *director_username_hash; const char *doveadm_api_key; const char *dsync_features; + unsigned int dsync_commit_msgs_interval; const char *doveadm_http_rawlog_dir; enum dsync_features parsed_features; ARRAY(const char *) plugin_envs; diff -r ba158fa10ff8 -r 81e013b3207d src/doveadm/dsync/dsync-brain-mailbox.c --- a/src/doveadm/dsync/dsync-brain-mailbox.c Fri May 12 12:44:27 2017 +0300 +++ b/src/doveadm/dsync/dsync-brain-mailbox.c Sun Apr 30 12:31:48 2017 +0300 @@ -236,6 +236,7 @@ brain->sync_until_timestamp, brain->sync_max_size, brain->sync_flag, + brain->import_commit_msgs_interval, import_flags); } diff -r ba158fa10ff8 -r 81e013b3207d src/doveadm/dsync/dsync-brain-private.h --- a/src/doveadm/dsync/dsync-brain-private.h Fri May 12 12:44:27 2017 +0300 +++ b/src/doveadm/dsync/dsync-brain-private.h Sun Apr 30 12:31:48 2017 +0300 @@ -62,6 +62,7 @@ uoff_t sync_max_size; const char *sync_flag; char alt_char; + unsigned int import_commit_msgs_interval; unsigned int lock_timeout; int lock_fd; diff -r ba158fa10ff8 -r 81e013b3207d src/doveadm/dsync/dsync-brain.c --- a/src/doveadm/dsync/dsync-brain.c Fri May 12 12:44:27 2017 +0300 +++ b/src/doveadm/dsync/dsync-brain.c Sun Apr 30 12:31:48 2017 +0300 @@ -221,6 +221,7 @@ memcpy(brain->sync_box_guid, set->sync_box_guid, sizeof(brain->sync_box_guid)); brain->lock_timeout = set->lock_timeout_secs; + brain->import_commit_msgs_interval = set->import_commit_msgs_interval; brain->master_brain = TRUE; dsync_brain_set_flags(brain, flags); @@ -260,6 +261,7 @@ ibc_set.sync_type = sync_type; ibc_set.hdr_hash_v2 = TRUE; ibc_set.lock_timeout = set->lock_timeout_secs; + ibc_set.import_commit_msgs_interval = set->import_commit_msgs_interval; /* reverse the backup direction for the slave */ ibc_set.brain_flags = flags & ~(DSYNC_BRAIN_FLAG_BACKUP_SEND | DSYNC_BRAIN_FLAG_BACKUP_RECV); diff -r ba158fa10ff8 -r 81e013b3207d src/doveadm/dsync/dsync-brain.h --- a/src/doveadm/dsync/dsync-brain.h Fri May 12 12:44:27 2017 +0300 +++ b/src/doveadm/dsync/dsync-brain.h Sun Apr 30 12:31:48 2017 +0300 @@ -79,6 +79,9 @@ /* If non-zero, use dsync lock file for this user */ unsigned int lock_timeout_secs; + /* If non-zero, importing will attempt to commit transaction after + saving this many messages. */ + unsigned int import_commit_msgs_interval; /* Input state for DSYNC_BRAIN_SYNC_TYPE_STATE */ const char *state; }; diff -r ba158fa10ff8 -r 81e013b3207d src/doveadm/dsync/dsync-ibc-stream.c --- a/src/doveadm/dsync/dsync-ibc-stream.c Fri May 12 12:44:27 2017 +0300 +++ b/src/doveadm/dsync/dsync-ibc-stream.c Sun Apr 30 12:31:48 2017 +0300 @@ -78,7 +78,7 @@ "send_mail_requests backup_send backup_recv lock_timeout " "no_mail_sync no_mailbox_renames no_backup_overwrite purge_remote " "no_notify sync_since_timestamp sync_max_size sync_flags sync_until_timestamp" - "virtual_all_box empty_hdr_workaround" + "virtual_all_box empty_hdr_workaround import_commit_msgs_interval" }, { .name = "mailbox_state", .chr = 'S', @@ -707,6 +707,10 @@ dsync_serializer_encode_add(encoder, "lock_timeout", t_strdup_printf("%u", set->lock_timeout)); } + if (set->import_commit_msgs_interval > 0) { + dsync_serializer_encode_add(encoder, "import_commit_msgs_interval", + t_strdup_printf("%u", set->import_commit_msgs_interval)); + } if (set->sync_since_timestamp > 0) { dsync_serializer_encode_add(encoder, "sync_since_timestamp", t_strdup_printf("%ld", (long)set->sync_since_timestamp)); @@ -823,6 +827,14 @@ return DSYNC_IBC_RECV_RET_TRYAGAIN; } } + if (dsync_deserializer_decode_try(decoder, "import_commit_msgs_interval", &value)) { + if (str_to_uint(value, &set->import_commit_msgs_interval) < 0 || + set->import_commit_msgs_interval == 0) { + dsync_ibc_input_error(ibc, decoder, + "Invalid import_commit_msgs_interval: %s", value); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + } if (dsync_deserializer_decode_try(decoder, "sync_since_timestamp", &value)) { if (str_to_time(value, &set->sync_since_timestamp) < 0 || set->sync_since_timestamp == 0) { diff -r ba158fa10ff8 -r 81e013b3207d src/doveadm/dsync/dsync-ibc.h --- a/src/doveadm/dsync/dsync-ibc.h Fri May 12 12:44:27 2017 +0300 +++ b/src/doveadm/dsync/dsync-ibc.h Sun Apr 30 12:31:48 2017 +0300 @@ -69,6 +69,7 @@ enum dsync_brain_flags brain_flags; bool hdr_hash_v2; unsigned int lock_timeout; + unsigned int import_commit_msgs_interval; }; void dsync_ibc_init_pipe(struct dsync_ibc **ibc1_r, diff -r ba158fa10ff8 -r 81e013b3207d src/doveadm/dsync/dsync-mailbox-import.c --- a/src/doveadm/dsync/dsync-mailbox-import.c Fri May 12 12:44:27 2017 +0300 +++ b/src/doveadm/dsync/dsync-mailbox-import.c Sun Apr 30 12:31:48 2017 +0300 @@ -44,6 +44,7 @@ unsigned int skip:1; unsigned int expunged:1; unsigned int copy_failed:1; + unsigned int saved:1; }; /* for quickly testing that two-way sync doesn't actually do any unexpected @@ -66,6 +67,7 @@ uoff_t sync_max_size; enum mailbox_transaction_flags transaction_flags; unsigned int hdr_hash_version; + unsigned int commit_msgs_interval; enum mail_flags sync_flag; const char *sync_keyword; @@ -106,6 +108,7 @@ uint32_t prev_uid, next_local_seq, local_uid_next; uint64_t local_initial_highestmodseq, local_initial_highestpvtmodseq; unsigned int import_pos, import_count; + unsigned int first_unsaved_idx, saves_since_commit; enum mail_error mail_error; @@ -222,6 +225,7 @@ time_t sync_until_timestamp, uoff_t sync_max_size, const char *sync_flag, + unsigned int commit_msgs_interval, enum dsync_mailbox_import_flags flags) { struct dsync_mailbox_importer *importer; @@ -257,6 +261,7 @@ else importer->sync_keyword = p_strdup(pool, sync_flag); } + importer->commit_msgs_interval = commit_msgs_interval; importer->transaction_flags = MAILBOX_TRANSACTION_FLAG_SYNC; if ((flags & DSYNC_MAILBOX_IMPORT_FLAG_NO_NOTIFY) != 0) importer->transaction_flags |= MAILBOX_TRANSACTION_FLAG_NO_NOTIFY; @@ -1819,6 +1824,17 @@ return importer->failed ? -1 : 0; } +static int +importer_new_mail_final_uid_cmp(struct importer_new_mail *const *newmail1, + struct importer_new_mail *const *newmail2) +{ + if ((*newmail1)->final_uid < (*newmail2)->final_uid) + return -1; + if ((*newmail1)->final_uid > (*newmail2)->final_uid) + return 1; + return 0; +} + static void dsync_mailbox_import_assign_new_uids(struct dsync_mailbox_importer *importer) { @@ -1831,6 +1847,7 @@ newmail = *newmailp; if (newmail->skip) { /* already assigned */ + i_assert(newmail->final_uid != 0); continue; } @@ -1858,6 +1875,9 @@ } importer->last_common_uid = common_uid_next-1; importer->new_uids_assigned = TRUE; + /* Sort the newmails by their final_uid. This is used for tracking + whether an intermediate commit is allowed. */ + array_sort(&importer->newmails, importer_new_mail_final_uid_cmp); } static int @@ -1903,6 +1923,45 @@ array_append(&importer->wanted_uids, &uid, 1); } +static void +dsync_mailbox_import_update_first_saved(struct dsync_mailbox_importer *importer) +{ + struct importer_new_mail *const *newmails; + unsigned int count; + + newmails = array_get(&importer->newmails, &count); + while (importer->first_unsaved_idx < count) { + if (!newmails[importer->first_unsaved_idx]->saved) + break; + importer->first_unsaved_idx++; + } +} + +static void +dsync_mailbox_import_saved_newmail(struct dsync_mailbox_importer *importer, + struct importer_new_mail *newmail) +{ + dsync_mailbox_import_saved_uid(importer, newmail->final_uid); + newmail->saved = TRUE; + + dsync_mailbox_import_update_first_saved(importer); + importer->saves_since_commit++; + /* we can commit only if all the upcoming mails will have UIDs that + are larger than we're committing. + + Note that if any existing UIDs have been changed, the new UID is + usually higher than anything that is being saved so we can't do + an intermediate commit. It's too much extra work to try to handle + that situation. So here this never happens, because then + array_count(wanted_uids) is always higher than first_unsaved_idx. */ + if (importer->saves_since_commit >= importer->commit_msgs_interval && + importer->first_unsaved_idx == array_count(&importer->wanted_uids)) { + if (dsync_mailbox_import_commit(importer, FALSE) < 0) + importer->failed = TRUE; + importer->saves_since_commit = 0; + } +} + static bool dsync_msg_change_uid(struct dsync_mailbox_importer *importer, uint32_t old_uid, uint32_t new_uid) @@ -2369,7 +2428,7 @@ } if (ret > 0) { i_assert(save_ctx == NULL); - dsync_mailbox_import_saved_uid(importer, newmail->final_uid); + dsync_mailbox_import_saved_newmail(importer, newmail); return TRUE; } /* fallback to saving from remote stream */ @@ -2449,8 +2508,7 @@ &importer->mail_error)); importer->failed = TRUE; } else { - dsync_mailbox_import_saved_uid(importer, - newmail->final_uid); + dsync_mailbox_import_saved_newmail(importer, newmail); } } return TRUE; diff -r ba158fa10ff8 -r 81e013b3207d src/doveadm/dsync/dsync-mailbox-import.h --- a/src/doveadm/dsync/dsync-mailbox-import.h Fri May 12 12:44:27 2017 +0300 +++ b/src/doveadm/dsync/dsync-mailbox-import.h Sun Apr 30 12:31:48 2017 +0300 @@ -36,6 +36,7 @@ time_t sync_until_timestamp, uoff_t sync_max_size, const char *sync_flag, + unsigned int commit_msgs_interval, enum dsync_mailbox_import_flags flags); int dsync_mailbox_import_attribute(struct dsync_mailbox_importer *importer, const struct dsync_mailbox_attribute *attr);