changeset 22043:81e013b3207d

dsync: Try to commit transactions every dsync_commit_msgs_interval messages This was first attempted to be implemented by ec0cc8fa647794e44a1afaa448f495a713048dc4, but it was later partially reverted by 5973d496b16721af6d2c1fa90b016aacddf13554. This current commit should fix its problems.
author Timo Sirainen <timo.sirainen@dovecot.fi>
date Sun, 30 Apr 2017 12:31:48 +0300
parents ba158fa10ff8
children c73b9f07b067
files src/doveadm/doveadm-dsync.c src/doveadm/doveadm-settings.c src/doveadm/doveadm-settings.h src/doveadm/dsync/dsync-brain-mailbox.c src/doveadm/dsync/dsync-brain-private.h src/doveadm/dsync/dsync-brain.c src/doveadm/dsync/dsync-brain.h src/doveadm/dsync/dsync-ibc-stream.c src/doveadm/dsync/dsync-ibc.h src/doveadm/dsync/dsync-mailbox-import.c src/doveadm/dsync/dsync-mailbox-import.h
diffstat 11 files changed, 89 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/src/doveadm/doveadm-dsync.c	Fri May 12 12:44:27 2017 +0300
+++ b/src/doveadm/doveadm-dsync.c	Sun Apr 30 12:31:48 2017 +0300
@@ -93,6 +93,7 @@
 	const char *error;
 
 	unsigned int lock_timeout;
+	unsigned int import_commit_msgs_interval;
 
 	unsigned int lock:1;
 	unsigned int purge_remote:1;
@@ -587,6 +588,7 @@
 	set.virtual_all_box = ctx->virtual_all_box;
 	memcpy(set.sync_box_guid, ctx->mailbox_guid, sizeof(set.sync_box_guid));
 	set.lock_timeout_secs = ctx->lock_timeout;
+	set.import_commit_msgs_interval = ctx->import_commit_msgs_interval;
 	set.state = ctx->state_input;
 	set.mailbox_alt_char = doveadm_settings->dsync_alt_char[0];
 
@@ -1107,6 +1109,7 @@
 	p_array_init(&ctx->namespace_prefixes, ctx->ctx.pool, 4);
         if ((doveadm_settings->parsed_features & DSYNC_FEATURE_EMPTY_HDR_WORKAROUND) != 0)
                 ctx->empty_hdr_workaround = TRUE;
+	ctx->import_commit_msgs_interval = doveadm_settings->dsync_commit_msgs_interval;
 	return &ctx->ctx;
 }
 
--- a/src/doveadm/doveadm-settings.c	Fri May 12 12:44:27 2017 +0300
+++ b/src/doveadm/doveadm-settings.c	Sun Apr 30 12:31:48 2017 +0300
@@ -72,6 +72,7 @@
 	DEF(SET_STR, director_username_hash),
 	DEF(SET_STR, doveadm_api_key),
 	DEF(SET_STR, dsync_features),
+	DEF(SET_UINT, dsync_commit_msgs_interval),
 	DEF(SET_STR, doveadm_http_rawlog_dir),
 
 	{ SET_STRLIST, "plugin", offsetof(struct doveadm_settings, plugin_envs), NULL },
@@ -95,6 +96,7 @@
 	.dsync_alt_char = "_",
 	.dsync_remote_cmd = "ssh -l%{login} %{host} doveadm dsync-server -u%u -U",
 	.dsync_features = "",
+	.dsync_commit_msgs_interval = 100,
 	.ssl_client_ca_dir = "",
 	.ssl_client_ca_file = "",
 	.director_username_hash = "%Lu",
--- a/src/doveadm/doveadm-settings.h	Fri May 12 12:44:27 2017 +0300
+++ b/src/doveadm/doveadm-settings.h	Sun Apr 30 12:31:48 2017 +0300
@@ -29,6 +29,7 @@
 	const char *director_username_hash;
 	const char *doveadm_api_key;
 	const char *dsync_features;
+	unsigned int dsync_commit_msgs_interval;
 	const char *doveadm_http_rawlog_dir;
 	enum dsync_features parsed_features;
 	ARRAY(const char *) plugin_envs;
--- a/src/doveadm/dsync/dsync-brain-mailbox.c	Fri May 12 12:44:27 2017 +0300
+++ b/src/doveadm/dsync/dsync-brain-mailbox.c	Sun Apr 30 12:31:48 2017 +0300
@@ -236,6 +236,7 @@
 					  brain->sync_until_timestamp,
 					  brain->sync_max_size,
 					  brain->sync_flag,
+					  brain->import_commit_msgs_interval,
 					  import_flags);
 }
 
--- a/src/doveadm/dsync/dsync-brain-private.h	Fri May 12 12:44:27 2017 +0300
+++ b/src/doveadm/dsync/dsync-brain-private.h	Sun Apr 30 12:31:48 2017 +0300
@@ -62,6 +62,7 @@
 	uoff_t sync_max_size;
 	const char *sync_flag;
 	char alt_char;
+	unsigned int import_commit_msgs_interval;
 
 	unsigned int lock_timeout;
 	int lock_fd;
--- a/src/doveadm/dsync/dsync-brain.c	Fri May 12 12:44:27 2017 +0300
+++ b/src/doveadm/dsync/dsync-brain.c	Sun Apr 30 12:31:48 2017 +0300
@@ -221,6 +221,7 @@
 	memcpy(brain->sync_box_guid, set->sync_box_guid,
 	       sizeof(brain->sync_box_guid));
 	brain->lock_timeout = set->lock_timeout_secs;
+	brain->import_commit_msgs_interval = set->import_commit_msgs_interval;
 	brain->master_brain = TRUE;
 	dsync_brain_set_flags(brain, flags);
 
@@ -260,6 +261,7 @@
 	ibc_set.sync_type = sync_type;
 	ibc_set.hdr_hash_v2 = TRUE;
 	ibc_set.lock_timeout = set->lock_timeout_secs;
+	ibc_set.import_commit_msgs_interval = set->import_commit_msgs_interval;
 	/* reverse the backup direction for the slave */
 	ibc_set.brain_flags = flags & ~(DSYNC_BRAIN_FLAG_BACKUP_SEND |
 					DSYNC_BRAIN_FLAG_BACKUP_RECV);
--- a/src/doveadm/dsync/dsync-brain.h	Fri May 12 12:44:27 2017 +0300
+++ b/src/doveadm/dsync/dsync-brain.h	Sun Apr 30 12:31:48 2017 +0300
@@ -79,6 +79,9 @@
 
 	/* If non-zero, use dsync lock file for this user */
 	unsigned int lock_timeout_secs;
+	/* If non-zero, importing will attempt to commit transaction after
+	   saving this many messages. */
+	unsigned int import_commit_msgs_interval;
 	/* Input state for DSYNC_BRAIN_SYNC_TYPE_STATE */
 	const char *state;
 };
--- a/src/doveadm/dsync/dsync-ibc-stream.c	Fri May 12 12:44:27 2017 +0300
+++ b/src/doveadm/dsync/dsync-ibc-stream.c	Sun Apr 30 12:31:48 2017 +0300
@@ -78,7 +78,7 @@
 	  	"send_mail_requests backup_send backup_recv lock_timeout "
 	  	"no_mail_sync no_mailbox_renames no_backup_overwrite purge_remote "
 		"no_notify sync_since_timestamp sync_max_size sync_flags sync_until_timestamp"
-	  	"virtual_all_box empty_hdr_workaround"
+	  	"virtual_all_box empty_hdr_workaround import_commit_msgs_interval"
 	},
 	{ .name = "mailbox_state",
 	  .chr = 'S',
@@ -707,6 +707,10 @@
 		dsync_serializer_encode_add(encoder, "lock_timeout",
 			t_strdup_printf("%u", set->lock_timeout));
 	}
+	if (set->import_commit_msgs_interval > 0) {
+		dsync_serializer_encode_add(encoder, "import_commit_msgs_interval",
+			t_strdup_printf("%u", set->import_commit_msgs_interval));
+	}
 	if (set->sync_since_timestamp > 0) {
 		dsync_serializer_encode_add(encoder, "sync_since_timestamp",
 			t_strdup_printf("%ld", (long)set->sync_since_timestamp));
@@ -823,6 +827,14 @@
 			return DSYNC_IBC_RECV_RET_TRYAGAIN;
 		}
 	}
+	if (dsync_deserializer_decode_try(decoder, "import_commit_msgs_interval", &value)) {
+		if (str_to_uint(value, &set->import_commit_msgs_interval) < 0 ||
+		    set->import_commit_msgs_interval == 0) {
+			dsync_ibc_input_error(ibc, decoder,
+				"Invalid import_commit_msgs_interval: %s", value);
+			return DSYNC_IBC_RECV_RET_TRYAGAIN;
+		}
+	}
 	if (dsync_deserializer_decode_try(decoder, "sync_since_timestamp", &value)) {
 		if (str_to_time(value, &set->sync_since_timestamp) < 0 ||
 		    set->sync_since_timestamp == 0) {
--- a/src/doveadm/dsync/dsync-ibc.h	Fri May 12 12:44:27 2017 +0300
+++ b/src/doveadm/dsync/dsync-ibc.h	Sun Apr 30 12:31:48 2017 +0300
@@ -69,6 +69,7 @@
 	enum dsync_brain_flags brain_flags;
 	bool hdr_hash_v2;
 	unsigned int lock_timeout;
+	unsigned int import_commit_msgs_interval;
 };
 
 void dsync_ibc_init_pipe(struct dsync_ibc **ibc1_r,
--- a/src/doveadm/dsync/dsync-mailbox-import.c	Fri May 12 12:44:27 2017 +0300
+++ b/src/doveadm/dsync/dsync-mailbox-import.c	Sun Apr 30 12:31:48 2017 +0300
@@ -44,6 +44,7 @@
 	unsigned int skip:1;
 	unsigned int expunged:1;
 	unsigned int copy_failed:1;
+	unsigned int saved:1;
 };
 
 /* for quickly testing that two-way sync doesn't actually do any unexpected
@@ -66,6 +67,7 @@
 	uoff_t sync_max_size;
 	enum mailbox_transaction_flags transaction_flags;
 	unsigned int hdr_hash_version;
+	unsigned int commit_msgs_interval;
 
 	enum mail_flags sync_flag;
 	const char *sync_keyword;
@@ -106,6 +108,7 @@
 	uint32_t prev_uid, next_local_seq, local_uid_next;
 	uint64_t local_initial_highestmodseq, local_initial_highestpvtmodseq;
 	unsigned int import_pos, import_count;
+	unsigned int first_unsaved_idx, saves_since_commit;
 
 	enum mail_error mail_error;
 
@@ -222,6 +225,7 @@
 			  time_t sync_until_timestamp,
 			  uoff_t sync_max_size,
 			  const char *sync_flag,
+			  unsigned int commit_msgs_interval,
 			  enum dsync_mailbox_import_flags flags)
 {
 	struct dsync_mailbox_importer *importer;
@@ -257,6 +261,7 @@
 		else
 			importer->sync_keyword = p_strdup(pool, sync_flag);
 	}
+	importer->commit_msgs_interval = commit_msgs_interval;
 	importer->transaction_flags = MAILBOX_TRANSACTION_FLAG_SYNC;
 	if ((flags & DSYNC_MAILBOX_IMPORT_FLAG_NO_NOTIFY) != 0)
 		importer->transaction_flags |= MAILBOX_TRANSACTION_FLAG_NO_NOTIFY;
@@ -1819,6 +1824,17 @@
 	return importer->failed ? -1 : 0;
 }
 
+static int
+importer_new_mail_final_uid_cmp(struct importer_new_mail *const *newmail1,
+				struct importer_new_mail *const *newmail2)
+{
+	if ((*newmail1)->final_uid < (*newmail2)->final_uid)
+		return -1;
+	if ((*newmail1)->final_uid > (*newmail2)->final_uid)
+		return 1;
+	return 0;
+}
+
 static void
 dsync_mailbox_import_assign_new_uids(struct dsync_mailbox_importer *importer)
 {
@@ -1831,6 +1847,7 @@
 		newmail = *newmailp;
 		if (newmail->skip) {
 			/* already assigned */
+			i_assert(newmail->final_uid != 0);
 			continue;
 		}
 
@@ -1858,6 +1875,9 @@
 	}
 	importer->last_common_uid = common_uid_next-1;
 	importer->new_uids_assigned = TRUE;
+	/* Sort the newmails by their final_uid. This is used for tracking
+	   whether an intermediate commit is allowed. */
+	array_sort(&importer->newmails, importer_new_mail_final_uid_cmp);
 }
 
 static int
@@ -1903,6 +1923,45 @@
 	array_append(&importer->wanted_uids, &uid, 1);
 }
 
+static void
+dsync_mailbox_import_update_first_saved(struct dsync_mailbox_importer *importer)
+{
+	struct importer_new_mail *const *newmails;
+	unsigned int count;
+
+	newmails = array_get(&importer->newmails, &count);
+	while (importer->first_unsaved_idx < count) {
+		if (!newmails[importer->first_unsaved_idx]->saved)
+			break;
+		importer->first_unsaved_idx++;
+	}
+}
+
+static void
+dsync_mailbox_import_saved_newmail(struct dsync_mailbox_importer *importer,
+				   struct importer_new_mail *newmail)
+{
+	dsync_mailbox_import_saved_uid(importer, newmail->final_uid);
+	newmail->saved = TRUE;
+
+	dsync_mailbox_import_update_first_saved(importer);
+	importer->saves_since_commit++;
+	/* we can commit only if all the upcoming mails will have UIDs that
+	   are larger than we're committing.
+
+	   Note that if any existing UIDs have been changed, the new UID is
+	   usually higher than anything that is being saved so we can't do
+	   an intermediate commit. It's too much extra work to try to handle
+	   that situation. So here this never happens, because then
+	   array_count(wanted_uids) is always higher than first_unsaved_idx. */
+	if (importer->saves_since_commit >= importer->commit_msgs_interval &&
+	    importer->first_unsaved_idx == array_count(&importer->wanted_uids)) {
+		if (dsync_mailbox_import_commit(importer, FALSE) < 0)
+			importer->failed = TRUE;
+		importer->saves_since_commit = 0;
+	}
+}
+
 static bool
 dsync_msg_change_uid(struct dsync_mailbox_importer *importer,
 		     uint32_t old_uid, uint32_t new_uid)
@@ -2369,7 +2428,7 @@
 	}
 	if (ret > 0) {
 		i_assert(save_ctx == NULL);
-		dsync_mailbox_import_saved_uid(importer, newmail->final_uid);
+		dsync_mailbox_import_saved_newmail(importer, newmail);
 		return TRUE;
 	}
 	/* fallback to saving from remote stream */
@@ -2449,8 +2508,7 @@
 								&importer->mail_error));
 			importer->failed = TRUE;
 		} else {
-			dsync_mailbox_import_saved_uid(importer,
-						       newmail->final_uid);
+			dsync_mailbox_import_saved_newmail(importer, newmail);
 		}
 	}
 	return TRUE;
--- a/src/doveadm/dsync/dsync-mailbox-import.h	Fri May 12 12:44:27 2017 +0300
+++ b/src/doveadm/dsync/dsync-mailbox-import.h	Sun Apr 30 12:31:48 2017 +0300
@@ -36,6 +36,7 @@
 			  time_t sync_until_timestamp,
 			  uoff_t sync_max_size,
 			  const char *sync_flag,
+			  unsigned int commit_msgs_interval,
 			  enum dsync_mailbox_import_flags flags);
 int dsync_mailbox_import_attribute(struct dsync_mailbox_importer *importer,
 				   const struct dsync_mailbox_attribute *attr);