changeset 14261:14ff849dc266

Initial implementation of dsync-based replication.
author Timo Sirainen <tss@iki.fi>
date Sun, 04 Mar 2012 09:50:21 +0200
parents a452e5f616a2
children 9693521aa153
files .hgignore configure.in src/Makefile.am src/doveadm/dsync/doveadm-dsync.c src/lib-storage/mail-user.h src/plugins/replication/Makefile.am src/plugins/replication/replication-plugin.c src/plugins/replication/replication-plugin.h src/replication/Makefile.am src/replication/aggregator/Makefile.am src/replication/aggregator/aggregator-settings.c src/replication/aggregator/aggregator-settings.h src/replication/aggregator/aggregator.c src/replication/aggregator/notify-connection.c src/replication/aggregator/notify-connection.h src/replication/aggregator/replicator-connection.c src/replication/aggregator/replicator-connection.h src/replication/replication-common.h src/replication/replicator/Makefile.am src/replication/replicator/doveadm-connection.c src/replication/replicator/doveadm-connection.h src/replication/replicator/notify-connection.c src/replication/replicator/notify-connection.h src/replication/replicator/replicator-brain.c src/replication/replicator/replicator-brain.h src/replication/replicator/replicator-queue.c src/replication/replicator/replicator-queue.h src/replication/replicator/replicator-settings.c src/replication/replicator/replicator-settings.h src/replication/replicator/replicator.c
diffstat 30 files changed, 2402 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/.hgignore	Sun Mar 04 09:39:45 2012 +0200
+++ b/.hgignore	Sun Mar 04 09:50:21 2012 +0200
@@ -85,6 +85,8 @@
 src/plugins/fts-squat/squat-test
 src/pop3-login/pop3-login
 src/pop3/pop3
+src/replication/replicator/replicator
+src/replication/aggregator/aggregator
 src/util/gdbhelper
 src/util/listview
 src/util/maildirlock
--- a/configure.in	Sun Mar 04 09:39:45 2012 +0200
+++ b/configure.in	Sun Mar 04 09:50:21 2012 +0200
@@ -2785,6 +2785,9 @@
 src/master/Makefile
 src/pop3/Makefile
 src/pop3-login/Makefile
+src/replication/Makefile
+src/replication/aggregator/Makefile
+src/replication/replicator/Makefile
 src/ssl-params/Makefile
 src/stats/Makefile
 src/util/Makefile
@@ -2803,6 +2806,7 @@
 src/plugins/notify/Makefile
 src/plugins/quota/Makefile
 src/plugins/imap-quota/Makefile
+src/plugins/replication/Makefile
 src/plugins/snarf/Makefile
 src/plugins/stats/Makefile
 src/plugins/imap-stats/Makefile
--- a/src/Makefile.am	Sun Mar 04 09:39:45 2012 +0200
+++ b/src/Makefile.am	Sun Mar 04 09:50:21 2012 +0200
@@ -39,6 +39,7 @@
 	log \
 	config \
 	director \
+	replication \
 	util \
 	doveadm \
 	ssl-params \
--- a/src/doveadm/dsync/doveadm-dsync.c	Sun Mar 04 09:39:45 2012 +0200
+++ b/src/doveadm/dsync/doveadm-dsync.c	Sun Mar 04 09:50:21 2012 +0200
@@ -368,6 +368,7 @@
 	int lock_fd, ret = 0;
 
 	user->admin = TRUE;
+	user->dsyncing = TRUE;
 
 	/* create workers */
 	worker1 = dsync_worker_init_local(user, ctx->namespace_prefix,
--- a/src/lib-storage/mail-user.h	Sun Mar 04 09:39:45 2012 +0200
+++ b/src/lib-storage/mail-user.h	Sun Mar 04 09:50:21 2012 +0200
@@ -56,6 +56,8 @@
 	unsigned int inbox_open_error_logged:1;
 	/* Fuzzy search works for this user (FTS enabled) */
 	unsigned int fuzzy_search:1;
+	/* We're running dsync */
+	unsigned int dsyncing:1;
 };
 
 struct mail_user_module_register {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/plugins/replication/Makefile.am	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,25 @@
+AM_CPPFLAGS = \
+	-I$(top_srcdir)/src/lib \
+	-I$(top_srcdir)/src/lib-mail \
+	-I$(top_srcdir)/src/lib-imap \
+	-I$(top_srcdir)/src/lib-index \
+	-I$(top_srcdir)/src/lib-storage \
+	-I$(top_srcdir)/src/replication \
+	-I$(top_srcdir)/src/plugins/notify
+
+NOPLUGIN_LDFLAGS =
+lib20_replication_plugin_la_LDFLAGS = -module -avoid-version
+
+module_LTLIBRARIES = \
+	lib20_replication_plugin.la
+
+if DOVECOT_PLUGIN_DEPS
+lib20_replication_plugin_la_LIBADD = \
+	../notify/lib15_notify_plugin.la
+endif
+
+lib20_replication_plugin_la_SOURCES = \
+	replication-plugin.c
+
+noinst_HEADERS = \
+	replication-plugin.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/plugins/replication/replication-plugin.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,353 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "str.h"
+#include "strescape.h"
+#include "fd-set-nonblock.h"
+#include "ioloop.h"
+#include "network.h"
+#include "write-full.h"
+#include "mail-user.h"
+#include "mail-storage-private.h"
+#include "notify-plugin.h"
+#include "replication-common.h"
+#include "replication-plugin.h"
+
+#include <stdlib.h>
+
+#define REPLICATION_SOCKET_NAME "replication-notify"
+#define REPLICATION_FIFO_NAME "replication-notify-fifo"
+#define REPLICATION_NOTIFY_DELAY_MSECS 500
+#define REPLICATION_SYNC_TIMEOUT_SECS 10
+
+#define REPLICATION_USER_CONTEXT(obj) \
+	MODULE_CONTEXT(obj, replication_user_module)
+
+struct replication_user {
+	union mail_user_module_context module_ctx;
+
+	const char *fifo_path;
+	const char *socket_path;
+
+	int fifo_fd;
+
+	struct timeout *to;
+	enum replication_priority priority;
+	unsigned int sync_secs;
+
+	bool fifo_failed;
+};
+
+struct replication_mail_txn_context {
+	struct mail_user *user;
+	bool new_messages;
+};
+
+static MODULE_CONTEXT_DEFINE_INIT(replication_user_module,
+				  &mail_user_module_register);
+
+static int
+replication_fifo_notify(struct mail_user *user,
+			enum replication_priority priority)
+{
+	struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+	string_t *str;
+	ssize_t ret;
+
+	if (ruser->fifo_failed)
+		return -1;
+	if (ruser->fifo_fd == -1) {
+		ruser->fifo_fd = open(ruser->fifo_path, O_WRONLY);
+		if (ruser->fifo_fd == -1) {
+			i_error("open(%s) failed: %m", ruser->fifo_path);
+			ruser->fifo_failed = TRUE;
+			return -1;
+		}
+		fd_set_nonblock(ruser->fifo_fd, TRUE);
+	}
+	/* <username> \t <priority> */
+	str = t_str_new(256);
+	str_tabescape_write(str, user->username);
+	str_append_c(str, '\t');
+	switch (priority) {
+	case REPLICATION_PRIORITY_NONE:
+	case REPLICATION_PRIORITY_SYNC:
+		i_unreached();
+	case REPLICATION_PRIORITY_LOW:
+		str_append(str, "low");
+		break;
+	case REPLICATION_PRIORITY_HIGH:
+		str_append(str, "high");
+		break;
+	}
+	str_append_c(str, '\n');
+	ret = write(ruser->fifo_fd, str_data(str), str_len(str));
+	if (ret == 0) {
+		/* busy, try again later */
+		return 0;
+	}
+	if (ret != (ssize_t)str_len(str)) {
+		if (ret < 0)
+			i_error("write(%s) failed: %m", ruser->fifo_path);
+		else {
+			i_error("write(%s) wrote partial data",
+				ruser->fifo_path);
+		}
+		if (close(ruser->fifo_fd) < 0)
+			i_error("close(%s) failed: %m", ruser->fifo_path);
+		ruser->fifo_fd = -1;
+		return -1;
+	}
+	return 1;
+}
+
+static void replication_notify_now(struct mail_user *user)
+{
+	struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+	int ret;
+
+	i_assert(ruser->priority != REPLICATION_PRIORITY_NONE);
+	i_assert(ruser->priority != REPLICATION_PRIORITY_SYNC);
+
+	if ((ret = replication_fifo_notify(user, ruser->priority)) < 0 &&
+	    !ruser->fifo_failed) {
+		/* retry once, in case replication server was restarted */
+		ret = replication_fifo_notify(user, ruser->priority);
+	}
+	if (ret != 0) {
+		timeout_remove(&ruser->to);
+		ruser->priority = REPLICATION_PRIORITY_NONE;
+	}
+}
+
+static int replication_notify_sync(struct mail_user *user)
+{
+	struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+	string_t *str;
+	char buf[1024];
+	int fd;
+	ssize_t ret;
+
+	fd = net_connect_unix(ruser->socket_path);
+	if (fd == -1) {
+		i_error("net_connect_unix(%s) failed: %m", ruser->socket_path);
+		return -1;
+	}
+	net_set_nonblock(fd, FALSE);
+
+	/* <username> \t "sync" */
+	str = t_str_new(256);
+	str_tabescape_write(str, user->username);
+	str_append(str, "\tsync\n");
+	alarm(ruser->sync_secs);
+	if (write_full(fd, str_data(str), str_len(str)) < 0) {
+		i_error("write(%s) failed: %m", ruser->socket_path);
+		ret = -1;
+	} else {
+		/* + | - */
+		ret = read(fd, buf, sizeof(buf));
+		if (ret < 0) {
+			if (ret != EINTR) {
+				i_error("read(%s) failed: %m",
+					ruser->socket_path);
+			} else {
+				i_warning("replication(%s): Sync failure: "
+					  "Timeout in %u secs",
+					  user->username, ruser->sync_secs);
+			}
+		} else if (ret == 0) {
+			i_error("read(%s) failed: EOF", ruser->socket_path);
+			ret = -1;
+		} else if (buf[0] == '+') {
+			/* success */
+			ret = 0;
+		} else if (buf[0] == '-') {
+			/* failure */
+			if (buf[ret-1] == '\n') ret--;
+			i_warning("replication(%s): Sync failure: %s",
+				  user->username, t_strndup(buf+1, ret-1));
+			ret = -1;
+		} else {
+			i_warning("replication(%s): "
+				  "Remote sent invalid input: %s",
+				  user->username, t_strndup(buf, ret));
+		}
+	}
+	alarm(0);
+	if (close(fd) < 0)
+		i_error("close(%s) failed: %m", ruser->socket_path);
+	return ret;
+}
+
+static void replication_notify(struct mail_user *user,
+			       enum replication_priority priority)
+{
+	struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+
+	if (user->dsyncing) {
+		/* we're running dsync, which means that the remote is telling
+		   us about a change. don't trigger a replication back to it */
+		return;
+	}
+
+	if (priority == REPLICATION_PRIORITY_SYNC) {
+		if (replication_notify_sync(user) == 0) {
+			timeout_remove(&ruser->to);
+			ruser->priority = REPLICATION_PRIORITY_NONE;
+			return;
+		}
+		/* sync replication failed, try as "high" via fifo */
+		priority = REPLICATION_PRIORITY_HIGH;
+	}
+
+	if (ruser->priority < priority)
+		ruser->priority = priority;
+	if (ruser->to == NULL) {
+		ruser->to = timeout_add(REPLICATION_NOTIFY_DELAY_MSECS,
+					replication_notify_now, user);
+	}
+}
+
+static void *
+replication_mail_transaction_begin(struct mailbox_transaction_context *t)
+{
+	struct replication_mail_txn_context *ctx;
+
+	ctx = i_new(struct replication_mail_txn_context, 1);
+	ctx->user = t->box->storage->user;
+	return ctx;
+}
+
+static void replication_mail_save(void *txn, struct mail *mail ATTR_UNUSED)
+{
+	struct replication_mail_txn_context *ctx =
+		(struct replication_mail_txn_context *)txn;
+
+	ctx->new_messages = TRUE;
+}
+
+static void replication_mail_copy(void *txn, struct mail *src ATTR_UNUSED,
+				  struct mail *dst ATTR_UNUSED)
+{
+	struct replication_mail_txn_context *ctx =
+		(struct replication_mail_txn_context *)txn;
+
+	ctx->new_messages = TRUE;
+}
+
+static void
+replication_mail_transaction_commit(void *txn,
+				    struct mail_transaction_commit_changes *changes)
+{
+	struct replication_mail_txn_context *ctx =
+		(struct replication_mail_txn_context *)txn;
+	struct replication_user *ruser = REPLICATION_USER_CONTEXT(ctx->user);
+	enum replication_priority priority;
+
+	if (ctx->new_messages || changes->changed) {
+		priority = !ctx->new_messages ? REPLICATION_PRIORITY_LOW :
+			ruser->sync_secs == 0 ? REPLICATION_PRIORITY_HIGH :
+			REPLICATION_PRIORITY_SYNC;
+		replication_notify(ctx->user, priority);
+	}
+	i_free(ctx);
+}
+
+static void replication_mailbox_create(struct mailbox *box)
+{
+	replication_notify(box->storage->user, REPLICATION_PRIORITY_LOW);
+}
+
+static void
+replication_mailbox_delete_commit(void *txn ATTR_UNUSED,
+				  struct mailbox *box)
+{
+	replication_notify(box->storage->user, REPLICATION_PRIORITY_LOW);
+}
+
+static void
+replication_mailbox_rename(struct mailbox *src ATTR_UNUSED,
+			   struct mailbox *dest,
+			   bool rename_children ATTR_UNUSED)
+{
+	replication_notify(dest->storage->user, REPLICATION_PRIORITY_LOW);
+}
+
+static void replication_mailbox_set_subscribed(struct mailbox *box,
+					       bool subscribed ATTR_UNUSED)
+{
+	replication_notify(box->storage->user, REPLICATION_PRIORITY_LOW);
+}
+
+static void replication_user_deinit(struct mail_user *user)
+{
+	struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+
+	if (ruser->to != NULL) {
+		replication_notify_now(user);
+		if (ruser->to != NULL) {
+			i_warning("%s: Couldn't send final notification "
+				  "due to fifo being busy", ruser->fifo_path);
+			timeout_remove(&ruser->to);
+		}
+	}
+
+	ruser->module_ctx.super.deinit(user);
+}
+
+static void replication_user_created(struct mail_user *user)
+{
+	struct mail_user_vfuncs *v = user->vlast;
+	struct replication_user *ruser;
+	const char *value;
+
+	ruser = p_new(user->pool, struct replication_user, 1);
+	ruser->module_ctx.super = *v;
+	user->vlast = &ruser->module_ctx.super;
+	v->deinit = replication_user_deinit;
+	MODULE_CONTEXT_SET(user, replication_user_module, ruser);
+
+	ruser->fifo_fd = -1;
+	ruser->fifo_path = p_strconcat(user->pool, user->set->base_dir,
+				       "/"REPLICATION_FIFO_NAME, NULL);
+	ruser->socket_path = p_strconcat(user->pool, user->set->base_dir,
+					 "/"REPLICATION_SOCKET_NAME, NULL);
+	value = mail_user_plugin_getenv(user, "replication_sync_timeout");
+	if (value != NULL && str_to_uint(value, &ruser->sync_secs) < 0) {
+		i_error("replication(%s): "
+			"Invalid replication_sync_timeout value: %s",
+			user->username, value);
+	}
+}
+
+static const struct notify_vfuncs replication_vfuncs = {
+	.mail_transaction_begin = replication_mail_transaction_begin,
+	.mail_save = replication_mail_save,
+	.mail_copy = replication_mail_copy,
+	.mail_transaction_commit = replication_mail_transaction_commit,
+	.mailbox_create = replication_mailbox_create,
+	.mailbox_delete_commit = replication_mailbox_delete_commit,
+	.mailbox_rename = replication_mailbox_rename,
+	.mailbox_set_subscribed = replication_mailbox_set_subscribed
+};
+
+static struct notify_context *replication_ctx;
+
+static struct mail_storage_hooks replication_mail_storage_hooks = {
+	.mail_user_created = replication_user_created
+};
+
+void replication_plugin_init(struct module *module)
+{
+	replication_ctx = notify_register(&replication_vfuncs);
+	mail_storage_hooks_add(module, &replication_mail_storage_hooks);
+}
+
+void replication_plugin_deinit(void)
+{
+	mail_storage_hooks_remove(&replication_mail_storage_hooks);
+	notify_unregister(replication_ctx);
+}
+
+const char *replication_plugin_dependencies[] = { "notify", NULL };
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/plugins/replication/replication-plugin.h	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,9 @@
+#ifndef REPLICATION_PLUGIN_H
+#define REPLICATION_PLUGIN_H
+
+extern const char *replication_plugin_dependencies[];
+
+void replication_plugin_init(struct module *module);
+void replication_plugin_deinit(void);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/Makefile.am	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,4 @@
+SUBDIRS = aggregator replicator
+
+noinst_HEADERS = \
+	replication-common.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/aggregator/Makefile.am	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,26 @@
+pkglibexecdir = $(libexecdir)/dovecot
+
+pkglibexec_PROGRAMS = aggregator
+
+AM_CPPFLAGS = \
+	-I$(top_srcdir)/src/lib \
+	-I$(top_srcdir)/src/lib-settings \
+	-I$(top_srcdir)/src/lib-auth \
+	-I$(top_srcdir)/src/lib-master \
+	-I$(top_srcdir)/src/replication \
+	-DPKG_STATEDIR=\""$(statedir)"\"
+
+aggregator_LDFLAGS = -export-dynamic
+aggregator_LDADD = $(LIBDOVECOT) $(MODULE_LIBS)
+aggregator_DEPENDENCIES = $(LIBDOVECOT_DEPS)
+
+aggregator_SOURCES = \
+	aggregator.c \
+	aggregator-settings.c \
+	notify-connection.c \
+	replicator-connection.c
+
+noinst_HEADERS = \
+	aggregator-settings.h \
+	notify-connection.h \
+	replicator-connection.c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/aggregator/aggregator-settings.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,85 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "buffer.h"
+#include "settings-parser.h"
+#include "service-settings.h"
+#include "aggregator-settings.h"
+
+/* <settings checks> */
+static struct file_listener_settings aggregator_unix_listeners_array[] = {
+	{ "replication-notify", 0600, "", "" }
+};
+static struct file_listener_settings *aggregator_unix_listeners[] = {
+	&aggregator_unix_listeners_array[0]
+};
+static buffer_t aggregator_unix_listeners_buf = {
+	aggregator_unix_listeners, sizeof(aggregator_unix_listeners), { 0, }
+};
+
+static struct file_listener_settings aggregator_fifo_listeners_array[] = {
+	{ "replication-notify-fifo", 0600, "", "" }
+};
+static struct file_listener_settings *aggregator_fifo_listeners[] = {
+	&aggregator_fifo_listeners_array[0]
+};
+static buffer_t aggregator_fifo_listeners_buf = {
+	aggregator_fifo_listeners, sizeof(aggregator_fifo_listeners), { 0, }
+};
+/* </settings checks> */
+
+struct service_settings aggregator_service_settings = {
+	.name = "aggregator",
+	.protocol = "",
+	.type = "",
+	.executable = "aggregator",
+	.user = "$default_internal_user",
+	.group = "",
+	.privileged_group = "",
+	.extra_groups = "",
+	.chroot = ".",
+
+	.drop_priv_before_exec = FALSE,
+
+	.process_min_avail = 0,
+	.process_limit = 0,
+	.client_limit = 0,
+	.service_count = 0,
+	.idle_kill = 0,
+	.vsz_limit = (uoff_t)-1,
+
+	.unix_listeners = { { &aggregator_unix_listeners_buf,
+			      sizeof(aggregator_unix_listeners[0]) } },
+	.fifo_listeners = { { &aggregator_fifo_listeners_buf,
+			      sizeof(aggregator_fifo_listeners[0]) } },
+	.inet_listeners = ARRAY_INIT
+};
+
+#undef DEF
+#define DEF(type, name) \
+	{ type, #name, offsetof(struct aggregator_settings, name), NULL }
+
+static const struct setting_define aggregator_setting_defines[] = {
+	DEF(SET_STR, replicator_host),
+	DEF(SET_UINT, replicator_port),
+
+	SETTING_DEFINE_LIST_END
+};
+
+const struct aggregator_settings aggregator_default_settings = {
+	.replicator_host = "replicator",
+	.replicator_port = 0
+};
+
+const struct setting_parser_info aggregator_setting_parser_info = {
+	.module_name = "aggregator",
+	.defines = aggregator_setting_defines,
+	.defaults = &aggregator_default_settings,
+
+	.type_offset = (size_t)-1,
+	.struct_size = sizeof(struct aggregator_settings),
+
+	.parent_offset = (size_t)-1
+};
+
+const struct aggregator_settings *aggregator_settings;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/aggregator/aggregator-settings.h	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,12 @@
+#ifndef AGGREGATOR_SETTINGS_H
+#define AGGREGATOR_SETTINGS_H
+
+struct aggregator_settings {
+	const char *replicator_host;
+	unsigned int replicator_port;
+};
+
+extern const struct setting_parser_info aggregator_setting_parser_info;
+extern const struct aggregator_settings *aggregator_settings;
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/aggregator/aggregator.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,75 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "restrict-access.h"
+#include "master-service.h"
+#include "master-service-settings.h"
+#include "aggregator-settings.h"
+#include "notify-connection.h"
+#include "replicator-connection.h"
+
+struct replicator_connection *replicator;
+
+static void client_connected(struct master_service_connection *conn)
+{
+	master_service_client_connection_accept(conn);
+	notify_connection_create(conn->fd, conn->fifo);
+}
+
+static void main_preinit(void)
+{
+	struct ip_addr *ips;
+	unsigned int ips_count;
+	const struct aggregator_settings *set;
+	void **sets;
+	int ret;
+
+	sets = master_service_settings_get_others(master_service);
+	set = sets[0];
+
+	if (set->replicator_port != 0) {
+		ret = net_gethostbyname(set->replicator_host, &ips, &ips_count);
+		if (ret != 0) {
+			i_fatal("replicator_host: gethostbyname(%s) failed: %s",
+				set->replicator_host, net_gethosterror(ret));
+		}
+		replicator = replicator_connection_create_inet(ips, ips_count,
+				set->replicator_port,
+				notify_connection_sync_callback);
+	} else {
+		replicator = replicator_connection_create_unix(set->replicator_host,
+				notify_connection_sync_callback);
+	}
+}
+
+int main(int argc, char *argv[])
+{
+	const struct setting_parser_info *set_roots[] = {
+		&aggregator_setting_parser_info,
+		NULL
+	};
+	const char *error;
+
+	master_service = master_service_init("aggregator", 0,
+					     &argc, &argv, NULL);
+	if (master_getopt(master_service) > 0)
+		return FATAL_DEFAULT;
+
+	if (master_service_settings_read_simple(master_service, set_roots,
+						&error) < 0)
+		i_fatal("Error reading configuration: %s", error);
+	master_service_init_log(master_service, "aggregator: ");
+
+	main_preinit();
+
+	restrict_access_by_env(NULL, FALSE);
+	restrict_access_allow_coredumps(TRUE);
+	master_service_init_finish(master_service);
+
+	master_service_run(master_service, client_connected);
+
+	notify_connections_destroy_all();
+	replicator_connection_destroy(&replicator);
+	master_service_deinit(&master_service);
+        return 0;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/aggregator/notify-connection.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,154 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "network.h"
+#include "istream.h"
+#include "ostream.h"
+#include "llist.h"
+#include "strescape.h"
+#include "master-service.h"
+#include "replication-common.h"
+#include "replicator-connection.h"
+#include "notify-connection.h"
+
+#define MAX_INBUF_SIZE 8192
+
+#define CONNECTION_IS_FIFO(conn) \
+	((conn)->output == NULL)
+
+struct notify_connection {
+	struct notify_connection *prev, *next;
+	int refcount;
+
+	int fd;
+	struct io *io;
+	struct istream *input;
+	struct ostream *output;
+};
+
+static struct notify_connection *conns = NULL;
+
+static void notify_connection_unref(struct notify_connection *conn);
+static void notify_connection_destroy(struct notify_connection *conn);
+
+static bool notify_input_error(struct notify_connection *conn)
+{
+	if (CONNECTION_IS_FIFO(conn))
+		return TRUE;
+	notify_connection_destroy(conn);
+	return FALSE;
+}
+
+void notify_connection_sync_callback(bool success, void *context)
+{
+	struct notify_connection *conn = context;
+
+	o_stream_send_str(conn->output, success ? "+\n" : "-\n");
+	notify_connection_unref(conn);
+}
+
+static int
+notify_input_line(struct notify_connection *conn, const char *line)
+{
+	const char *const *args;
+	enum replication_priority priority;
+
+	/* <username> \t <priority> */
+	args = t_strsplit_tabescaped(line);
+	if (str_array_length(args) < 2) {
+		i_error("Client sent invalid input");
+		return -1;
+	}
+	if (replication_priority_parse(args[1], &priority) < 0) {
+		i_error("Client sent invalid priority: %s", args[1]);
+		return -1;
+	}
+	if (priority != REPLICATION_PRIORITY_SYNC)
+		replicator_connection_notify(replicator, args[0], priority);
+	else {
+		conn->refcount++;
+		replicator_connection_notify_sync(replicator, args[0], conn);
+	}
+	return 0;
+}
+
+static void notify_input(struct notify_connection *conn)
+{
+	const char *line;
+	int ret;
+
+	switch (i_stream_read(conn->input)) {
+	case -2:
+		/* buffer full */
+		i_error("Client sent too long line");
+		notify_input_error(conn);
+		return;
+	case -1:
+		/* disconnected */
+		notify_connection_destroy(conn);
+		return;
+	}
+
+	while ((line = i_stream_next_line(conn->input)) != NULL) {
+		T_BEGIN {
+			ret = notify_input_line(conn, line);
+		} T_END;
+		if (ret < 0) {
+			if (!notify_input_error(conn))
+				return;
+		}
+	}
+}
+
+void notify_connection_create(int fd, bool fifo)
+{
+	struct notify_connection *conn;
+
+	conn = i_new(struct notify_connection, 1);
+	conn->refcount = 1;
+	conn->fd = fd;
+	conn->io = io_add(fd, IO_READ, notify_input, conn);
+	conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE);
+	if (!fifo)
+		conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
+
+	DLLIST_PREPEND(&conns, conn);
+}
+
+static void notify_connection_unref(struct notify_connection *conn)
+{
+	i_assert(conn->refcount > 0);
+	if (--conn->refcount > 0)
+		return;
+
+	i_stream_destroy(&conn->input);
+	if (conn->output != NULL)
+		o_stream_destroy(&conn->output);
+	i_free(conn);
+}
+
+static void notify_connection_destroy(struct notify_connection *conn)
+{
+	i_assert(conn->fd != -1);
+
+	if (!CONNECTION_IS_FIFO(conn))
+		master_service_client_connection_destroyed(master_service);
+
+	DLLIST_REMOVE(&conns, conn);
+
+	io_remove(&conn->io);
+	i_stream_close(conn->input);
+	if (conn->output != NULL)
+		o_stream_close(conn->output);
+	net_disconnect(conn->fd);
+	conn->fd = -1;
+
+	notify_connection_unref(conn);
+}
+
+void notify_connections_destroy_all(void)
+{
+	while (conns != NULL)
+		notify_connection_destroy(conns);
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/aggregator/notify-connection.h	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,9 @@
+#ifndef NOTIFY_CONNECTION_H
+#define NOTIFY_CONNECTION_H
+
+void notify_connection_create(int fd, bool fifo);
+void notify_connections_destroy_all(void);
+
+void notify_connection_sync_callback(bool success, void *context);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/aggregator/replicator-connection.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,321 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "network.h"
+#include "istream.h"
+#include "ostream.h"
+#include "buffer.h"
+#include "hash.h"
+#include "llist.h"
+#include "strescape.h"
+#include "replicator-connection.h"
+
+#define MAX_INBUF_SIZE 1024
+#define REPLICATOR_RECONNECT_MSECS 5000
+#define REPLICATOR_MEMBUF_MAX_SIZE 1024*1024
+#define REPLICATOR_HANDSHAKE "VERSION\treplicator-notify\t1\t0\n"
+
+struct replicator_connection {
+	char *path;
+	struct ip_addr *ips;
+	unsigned int ips_count, ip_idx, port;
+
+	int fd;
+	struct io *io;
+	struct istream *input;
+	struct ostream *output;
+	struct timeout *to;
+
+	buffer_t *queue[REPLICATION_PRIORITY_SYNC + 1];
+
+	struct hash_table *requests;
+	unsigned int request_id_counter;
+	replicator_sync_callback_t *callback;
+};
+
+static void replicator_connection_disconnect(struct replicator_connection *conn);
+
+static int
+replicator_input_line(struct replicator_connection *conn, const char *line)
+{
+	void *context;
+	unsigned int id;
+
+	/* <+|-> \t <id> */
+	if ((line[0] != '+' && line[0] != '-') || line[1] != '\t' ||
+	    str_to_uint(line+2, &id) < 0 || id == 0) {
+		i_error("Replicator sent invalid input: %s", line);
+		return -1;
+	}
+
+	context = hash_table_lookup(conn->requests, POINTER_CAST(id));
+	if (context == NULL) {
+		i_error("Replicator sent invalid ID: %u", id);
+		return -1;
+	}
+	hash_table_remove(conn->requests, context);
+	conn->callback(line[0] == '+', context);
+	return 0;
+}
+
+static void replicator_input(struct replicator_connection *conn)
+{
+	const char *line;
+
+	switch (i_stream_read(conn->input)) {
+	case -2:
+		/* buffer full */
+		i_error("Replicator sent too long line");
+		replicator_connection_disconnect(conn);
+		return;
+	case -1:
+		/* disconnected */
+		replicator_connection_disconnect(conn);
+		return;
+	}
+
+	while ((line = i_stream_next_line(conn->input)) != NULL)
+		(void)replicator_input_line(conn, line);
+}
+
+static bool
+replicator_send_buf(struct replicator_connection *conn, buffer_t *buf)
+{
+	const unsigned char *data = buf->data;
+	unsigned int len = IO_BLOCK_SIZE;
+
+	/* try to send about IO_BLOCK_SIZE amount of data,
+	   but only full lines */
+	if (len > buf->used)
+		len = buf->used;
+	for (;; len++) {
+		i_assert(len < buf->used); /* there is always LF */
+		if (data[len] == '\n') {
+			len++;
+			break;
+		}
+	}
+
+	if (o_stream_send(conn->output, data, len) < 0) {
+		replicator_connection_disconnect(conn);
+		return FALSE;
+	}
+	buffer_delete(buf, 0, len);
+	return TRUE;
+}
+
+static int replicator_output(struct replicator_connection *conn)
+{
+	enum replication_priority p;
+
+	if (o_stream_flush(conn->output) < 0) {
+		replicator_connection_disconnect(conn);
+		return 1;
+	}
+
+	for (p = REPLICATION_PRIORITY_SYNC;;) {
+		if (o_stream_get_buffer_used_size(conn->output) > 0) {
+			o_stream_set_flush_pending(conn->output, TRUE);
+			break;
+		}
+		/* output buffer is empty, send more data */
+		if (conn->queue[p]->used > 0) {
+			if (!replicator_send_buf(conn, conn->queue[p]))
+				break;
+		} else {
+			if (p == REPLICATION_PRIORITY_LOW)
+				break;
+			p--;
+		}
+	}
+	return 1;
+}
+
+static void replicator_connection_connect(struct replicator_connection *conn)
+{
+	unsigned int n;
+	int fd = -1;
+
+	if (conn->port == 0) {
+		fd = net_connect_unix(conn->path);
+		if (fd == -1)
+			i_error("net_connect_unix(%s) failed: %m", conn->path);
+	} else {
+		for (n = 0; n < conn->ips_count; n++) {
+			unsigned int idx = conn->ip_idx;
+
+			conn->ip_idx = (conn->ip_idx + 1) % conn->ips_count;
+			fd = net_connect_ip(&conn->ips[idx], conn->port, NULL);
+			if (fd != -1)
+				break;
+			i_error("connect(%s, %u) failed: %m",
+				net_ip2addr(&conn->ips[idx]), conn->port);
+		}
+	}
+
+	if (fd == -1) {
+		if (conn->to == NULL) {
+			conn->to = timeout_add(REPLICATOR_RECONNECT_MSECS,
+					       replicator_connection_connect,
+					       conn);
+		}
+		return;
+	}
+
+	if (conn->to != NULL)
+		timeout_remove(&conn->to);
+	conn->fd = fd;
+	conn->io = io_add(fd, IO_READ, replicator_input, conn);
+	conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE);
+	conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
+	(void)o_stream_send_str(conn->output, REPLICATOR_HANDSHAKE);
+	o_stream_set_flush_callback(conn->output, replicator_output, conn);
+}
+
+static void replicator_abort_all_requests(struct replicator_connection *conn)
+{
+	struct hash_iterate_context *iter;
+	void *key, *value;
+
+	iter = hash_table_iterate_init(conn->requests);
+	while (hash_table_iterate(iter, &key, &value))
+		conn->callback(FALSE, value);
+	hash_table_iterate_deinit(&iter);
+	hash_table_clear(conn->requests, TRUE);
+}
+
+static void replicator_connection_disconnect(struct replicator_connection *conn)
+{
+	if (conn->fd == -1)
+		return;
+
+	replicator_abort_all_requests(conn);
+	io_remove(&conn->io);
+	i_stream_destroy(&conn->input);
+	o_stream_destroy(&conn->output);
+	net_disconnect(conn->fd);
+}
+
+static struct replicator_connection *replicator_connection_create(void)
+{
+	struct replicator_connection *conn;
+	unsigned int i;
+
+	conn = i_new(struct replicator_connection, 1);
+	conn->fd = -1;
+	conn->requests = hash_table_create(default_pool, default_pool,
+					   0, NULL, NULL);
+	for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++)
+		conn->queue[i] = buffer_create_dynamic(default_pool, 1024);
+	return conn;
+}
+
+struct replicator_connection *
+replicator_connection_create_unix(const char *path,
+				  replicator_sync_callback_t *callback)
+{
+	struct replicator_connection *conn;
+
+	conn = replicator_connection_create();
+	conn->callback = callback;
+	conn->path = i_strdup(path);
+	return conn;
+}
+
+struct replicator_connection *
+replicator_connection_create_inet(const struct ip_addr *ips,
+				  unsigned int ips_count, unsigned int port,
+				  replicator_sync_callback_t *callback)
+{
+	struct replicator_connection *conn;
+
+	conn = replicator_connection_create();
+	conn->callback = callback;
+	conn->ips = i_new(struct ip_addr, ips_count);
+	memcpy(conn->ips, ips, sizeof(*ips) * ips_count);
+	conn->ips_count = ips_count;
+	conn->port = port;
+	return conn;
+}
+
+void replicator_connection_destroy(struct replicator_connection **_conn)
+{
+	struct replicator_connection *conn = *_conn;
+	unsigned int i;
+
+	*_conn = NULL;
+	replicator_connection_disconnect(conn);
+
+	for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++)
+		buffer_free(&conn->queue[i]);
+
+	if (conn->to != NULL)
+		timeout_remove(&conn->to);
+	hash_table_destroy(&conn->requests);
+	i_free(conn);
+}
+
+static void
+replicator_send(struct replicator_connection *conn,
+		enum replication_priority priority, const char *data)
+{
+	unsigned int data_len = strlen(data);
+
+	if (conn->fd != -1 &&
+	    o_stream_get_buffer_used_size(conn->output) == 0) {
+		/* we can send data immediately */
+		o_stream_send(conn->output, data, data_len);
+	} else if (conn->queue[priority]->used + data_len >=
+		   	REPLICATOR_MEMBUF_MAX_SIZE) {
+		/* FIXME: compress duplicates, start writing to file */
+	} else {
+		/* queue internally to separate queues */
+		buffer_append(conn->queue[priority], data, data_len);
+		if (conn->output != NULL)
+			o_stream_set_flush_pending(conn->output, TRUE);
+	}
+}
+
+void replicator_connection_notify(struct replicator_connection *conn,
+				  const char *username,
+				  enum replication_priority priority)
+{
+	const char *priority_str = "";
+
+	replicator_connection_connect(conn);
+
+	switch (priority) {
+	case REPLICATION_PRIORITY_NONE:
+	case REPLICATION_PRIORITY_SYNC:
+		i_unreached();
+	case REPLICATION_PRIORITY_LOW:
+		priority_str = "low";
+		break;
+	case REPLICATION_PRIORITY_HIGH:
+		priority_str = "high";
+		break;
+	}
+
+	T_BEGIN {
+		replicator_send(conn, priority, t_strdup_printf(
+			"U\t%s\t%s\n", str_tabescape(username), priority_str));
+	} T_END;
+}
+
+void replicator_connection_notify_sync(struct replicator_connection *conn,
+				       const char *username, void *context)
+{
+	unsigned int id;
+
+	replicator_connection_connect(conn);
+
+	id = ++conn->request_id_counter;
+	if (id == 0) id++;
+	hash_table_insert(conn->requests, POINTER_CAST(id), context);
+
+	T_BEGIN {
+		replicator_send(conn, REPLICATION_PRIORITY_SYNC, t_strdup_printf(
+			"U\t%s\tsync\t%u\n", str_tabescape(username), id));
+	} T_END;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/aggregator/replicator-connection.h	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,25 @@
+#ifndef REPLICATOR_CONNECTION_H
+#define REPLICATOR_CONNECTION_H
+
+#include "replication-common.h"
+
+typedef void replicator_sync_callback_t(bool success, void *context);
+
+struct replicator_connection *
+replicator_connection_create_unix(const char *path,
+				  replicator_sync_callback_t *callback);
+struct replicator_connection *
+replicator_connection_create_inet(const struct ip_addr *ips,
+				  unsigned int ips_count, unsigned int port,
+				  replicator_sync_callback_t *callback);
+void replicator_connection_destroy(struct replicator_connection **conn);
+
+void replicator_connection_notify(struct replicator_connection *conn,
+				  const char *username,
+				  enum replication_priority priority);
+void replicator_connection_notify_sync(struct replicator_connection *conn,
+				       const char *username, void *context);
+
+extern struct replicator_connection *replicator;
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replication-common.h	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,30 @@
+#ifndef REPLICATION_COMMON_H
+#define REPLICATION_COMMON_H
+
+enum replication_priority {
+	/* user is fully replicated, as far as we know */
+	REPLICATION_PRIORITY_NONE = 0,
+	/* flag changes, expunges, etc. */
+	REPLICATION_PRIORITY_LOW,
+	/* new emails */
+	REPLICATION_PRIORITY_HIGH,
+	/* synchronously wait for new emails to be replicated */
+	REPLICATION_PRIORITY_SYNC
+};
+
+static inline int
+replication_priority_parse(const char *str,
+			   enum replication_priority *priority_r)
+{
+	if (strcmp(str, "low") == 0)
+		*priority_r = REPLICATION_PRIORITY_LOW;
+	else if (strcmp(str, "high") == 0)
+		*priority_r = REPLICATION_PRIORITY_HIGH;
+	else if (strcmp(str, "sync") == 0)
+		*priority_r = REPLICATION_PRIORITY_SYNC;
+	else
+		return -1;
+	return 0;
+}
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/Makefile.am	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,30 @@
+pkglibexecdir = $(libexecdir)/dovecot
+
+pkglibexec_PROGRAMS = replicator
+
+AM_CPPFLAGS = \
+	-I$(top_srcdir)/src/lib \
+	-I$(top_srcdir)/src/lib-settings \
+	-I$(top_srcdir)/src/lib-auth \
+	-I$(top_srcdir)/src/lib-master \
+	-I$(top_srcdir)/src/replication \
+	-DPKG_STATEDIR=\""$(statedir)"\"
+
+replicator_LDFLAGS = -export-dynamic
+replicator_LDADD = $(LIBDOVECOT) $(MODULE_LIBS)
+replicator_DEPENDENCIES = $(LIBDOVECOT_DEPS)
+
+replicator_SOURCES = \
+	doveadm-connection.c \
+	replicator.c \
+	replicator-brain.c \
+	replicator-queue.c \
+	replicator-settings.c \
+	notify-connection.c
+
+noinst_HEADERS = \
+	doveadm-connection.h \
+	replicator-brain.h \
+	replicator-queue.h \
+	replicator-settings.h \
+	notify-connection.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/doveadm-connection.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,194 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "network.h"
+#include "istream.h"
+#include "ostream.h"
+#include "str.h"
+#include "strescape.h"
+#include "doveadm-connection.h"
+
+#include <unistd.h>
+
+#define DOVEADM_FAIL_TIMEOUT_MSECS (1000*5)
+#define DOVEADM_HANDSHAKE "VERSION\tdoveadm-server\t1\t0\n"
+#define MAX_INBUF_SIZE 1024
+
+struct doveadm_connection {
+	char *path;
+	int fd;
+	struct io *io;
+	struct istream *input;
+	struct ostream *output;
+	struct timeout *to;
+
+	doveadm_callback_t *callback;
+	void *context;
+
+	time_t last_connect_failure;
+	unsigned int handshaked:1;
+	unsigned int end_of_print:1;
+};
+
+struct doveadm_connection *doveadm_connection_init(const char *path)
+{
+	struct doveadm_connection *conn;
+
+	conn = i_new(struct doveadm_connection, 1);
+	conn->path = i_strdup(path);
+	conn->fd = -1;
+	return conn;
+}
+
+static void doveadm_callback(struct doveadm_connection *conn,
+			     enum doveadm_reply reply)
+{
+	doveadm_callback_t *callback = conn->callback;
+	void *context = conn->context;
+
+	if (conn->to != NULL)
+		timeout_remove(&conn->to);
+
+	conn->callback = NULL;
+	conn->context = NULL;
+	callback(reply, context);
+}
+
+static void doveadm_disconnect(struct doveadm_connection *conn)
+{
+	if (conn->fd == -1)
+		return;
+
+	io_remove(&conn->io);
+	o_stream_destroy(&conn->output);
+	i_stream_destroy(&conn->input);
+	if (close(conn->fd) < 0)
+		i_error("close(doveadm) failed: %m");
+	conn->fd = -1;
+
+	if (conn->callback != NULL)
+		doveadm_callback(conn, DOVEADM_REPLY_FAIL);
+}
+
+void doveadm_connection_deinit(struct doveadm_connection **_conn)
+{
+	struct doveadm_connection *conn = *_conn;
+
+	*_conn = NULL;
+
+	doveadm_disconnect(conn);
+	i_free(conn->path);
+	i_free(conn);
+}
+
+static int doveadm_input_line(struct doveadm_connection *conn, const char *line)
+{
+	if (!conn->handshaked) {
+		if (strcmp(line, "+") != 0) {
+			i_error("%s: Unexpected handshake: %s",
+				conn->path, line);
+			return -1;
+		}
+		conn->handshaked = TRUE;
+		return 0;
+	}
+	if (conn->callback == NULL) {
+		i_error("%s: Unexpected input: %s", conn->path, line);
+		return -1;
+	}
+	if (!conn->end_of_print) {
+		if (line[0] == '\0')
+			conn->end_of_print = TRUE;
+		return 0;
+	}
+	if (line[0] == '+')
+		doveadm_callback(conn, DOVEADM_REPLY_OK);
+	else if (line[0] == '-') {
+		/* FIXME: handle DOVEADM_REPLY_NOUSER */
+		doveadm_callback(conn, DOVEADM_REPLY_FAIL);
+	} else {
+		i_error("%s: Invalid input: %s", conn->path, line);
+		return -1;
+	}
+	conn->end_of_print = FALSE;
+	/* FIXME: disconnect after each request for now.
+	   doveadm server's getopt() handling seems to break otherwise */
+	doveadm_disconnect(conn);
+	return 0;
+}
+
+static void doveadm_input(struct doveadm_connection *conn)
+{
+	const char *line;
+
+	while ((line = i_stream_read_next_line(conn->input)) != NULL) {
+		if (doveadm_input_line(conn, line) < 0) {
+			doveadm_disconnect(conn);
+			return;
+		}
+	}
+	if (conn->input->eof)
+		doveadm_disconnect(conn);
+}
+
+static int doveadm_connect(struct doveadm_connection *conn)
+{
+	if (conn->fd != -1)
+		return 0;
+
+	if (conn->last_connect_failure == ioloop_time)
+		return -1;
+
+	conn->fd = net_connect_unix(conn->path);
+	if (conn->fd == -1) {
+		i_error("net_connect_unix(%s) failed: %m", conn->path);
+		conn->last_connect_failure = ioloop_time;
+		return -1;
+	}
+	conn->last_connect_failure = 0;
+	conn->io = io_add(conn->fd, IO_READ, doveadm_input, conn);
+	conn->input = i_stream_create_fd(conn->fd, MAX_INBUF_SIZE, FALSE);
+	conn->output = o_stream_create_fd(conn->fd, (size_t)-1, FALSE);
+	o_stream_send_str(conn->output, DOVEADM_HANDSHAKE);
+	return 0;
+}
+
+static void doveadm_fail_timeout(struct doveadm_connection *conn)
+{
+	doveadm_callback(conn, DOVEADM_REPLY_FAIL);
+}
+
+void doveadm_connection_sync(struct doveadm_connection *conn,
+			     const char *username, bool full,
+			     doveadm_callback_t *callback, void *context)
+{
+	string_t *cmd;
+
+	i_assert(callback != NULL);
+	i_assert(!doveadm_connection_is_busy(conn));
+
+	conn->callback = callback;
+	conn->context = context;
+
+	if (doveadm_connect(conn) < 0) {
+		i_assert(conn->to == NULL);
+		conn->to = timeout_add(DOVEADM_FAIL_TIMEOUT_MSECS,
+				       doveadm_fail_timeout, conn);
+	} else {
+		/* <flags> <username> <command> [<args>] */
+		cmd = t_str_new(256);
+		str_append_c(cmd, '\t');
+		str_tabescape_write(cmd, username);
+		str_append(cmd, "\tsync\t-d");
+		if (full)
+			str_append(cmd, "\t-f");
+		str_append_c(cmd, '\n');
+		o_stream_send(conn->output, str_data(cmd), str_len(cmd));
+	}
+}
+
+bool doveadm_connection_is_busy(struct doveadm_connection *conn)
+{
+	return conn->callback != NULL;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/doveadm-connection.h	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,20 @@
+#ifndef DOVEADM_CONNECTION_H
+#define DOVEADM_CONNECTION_H
+
+enum doveadm_reply {
+	DOVEADM_REPLY_OK,
+	DOVEADM_REPLY_FAIL,
+	DOVEADM_REPLY_NOUSER
+};
+
+typedef void doveadm_callback_t(enum doveadm_reply reply, void *context);
+
+struct doveadm_connection *doveadm_connection_init(const char *path);
+void doveadm_connection_deinit(struct doveadm_connection **conn);
+
+void doveadm_connection_sync(struct doveadm_connection *conn,
+			     const char *username, bool full,
+			     doveadm_callback_t *callback, void *context);
+bool doveadm_connection_is_busy(struct doveadm_connection *conn);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/notify-connection.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,197 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "llist.h"
+#include "istream.h"
+#include "ostream.h"
+#include "strescape.h"
+#include "master-service.h"
+#include "replicator-queue.h"
+#include "notify-connection.h"
+
+#include <unistd.h>
+
+#define MAX_INBUF_SIZE (1024*64)
+#define NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION 1
+#define NOTIFY_CLIENT_PROTOCOL_MINOR_VERSION 0
+
+struct notify_connection {
+	struct notify_connection *prev, *next;
+	int refcount;
+
+	int fd;
+	struct io *io;
+	struct istream *input;
+	struct ostream *output;
+
+	struct replicator_queue *queue;
+
+	unsigned int version_received:1;
+	unsigned int destroyed:1;
+};
+
+struct notify_sync_request {
+	struct notify_connection *conn;
+	unsigned int id;
+};
+
+static struct notify_connection *connections;
+
+static void notify_connection_destroy(struct notify_connection *conn);
+
+static void notify_sync_callback(bool success, void *context)
+{
+	struct notify_sync_request *request = context;
+
+	o_stream_send_str(request->conn->output, t_strdup_printf(
+		"%c\t%u\n", success ? '+' : '-', request->id));
+
+	notify_connection_unref(&request->conn);
+	i_free(request);
+}
+
+static int
+notify_connection_input_line(struct notify_connection *conn, const char *line)
+{
+	struct notify_sync_request *request;
+	const char *const *args;
+	enum replication_priority priority;
+	unsigned int id;
+
+	/* U \t <username> \t <priority> [\t <sync id>] */
+	args = t_strsplit_tabescaped(line);
+	if (str_array_length(args) < 2) {
+		i_error("notify client sent invalid input: %s", line);
+		return -1;
+	}
+	if (strcmp(args[0], "U") != 0) {
+		i_error("notify client sent unknown command: %s", args[0]);
+		return -1;
+	}
+	if (replication_priority_parse(args[2], &priority) < 0) {
+		i_error("notify client sent invalid priority: %s", args[2]);
+		return -1;
+	}
+	if (priority != REPLICATION_PRIORITY_SYNC)
+		replicator_queue_add(conn->queue, args[1], priority);
+	else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) {
+		i_error("notify client sent invalid sync id: %s", line);
+		return -1;
+	} else {
+		request = i_new(struct notify_sync_request, 1);
+		request->conn = conn;
+		request->id = id;
+		notify_connection_ref(conn);
+		replicator_queue_add_sync(conn->queue, args[1],
+					  notify_sync_callback, request);
+	}
+	return 0;
+}
+
+static void notify_connection_input(struct notify_connection *conn)
+{
+	const char *line;
+	int ret;
+
+	switch (i_stream_read(conn->input)) {
+	case -2:
+		i_error("BUG: Client connection sent too much data");
+		notify_connection_destroy(conn);
+		return;
+	case -1:
+		notify_connection_destroy(conn);
+		return;
+	}
+
+	if (!conn->version_received) {
+		if ((line = i_stream_next_line(conn->input)) == NULL)
+			return;
+
+		if (!version_string_verify(line, "replicator-notify",
+				NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION)) {
+			i_error("Notify client not compatible with this server "
+				"(mixed old and new binaries?)");
+			notify_connection_destroy(conn);
+			return;
+		}
+		conn->version_received = TRUE;
+	}
+
+	while ((line = i_stream_next_line(conn->input)) != NULL) {
+		T_BEGIN {
+			ret = notify_connection_input_line(conn, line);
+		} T_END;
+		if (ret < 0) {
+			notify_connection_destroy(conn);
+			break;
+		}
+	}
+}
+
+struct notify_connection *
+notify_connection_create(int fd, struct replicator_queue *queue)
+{
+	struct notify_connection *conn;
+
+	i_assert(fd >= 0);
+
+	conn = i_new(struct notify_connection, 1);
+	conn->refcount = 1;
+	conn->queue = queue;
+	conn->fd = fd;
+	conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE);
+	conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
+	conn->io = io_add(fd, IO_READ, notify_connection_input, conn);
+	conn->queue = queue;
+
+	DLLIST_PREPEND(&connections, conn);
+	return conn;
+}
+
+static void notify_connection_destroy(struct notify_connection *conn)
+{
+	if (conn->destroyed)
+		return;
+	conn->destroyed = TRUE;
+
+	DLLIST_REMOVE(&connections, conn);
+
+	io_remove(&conn->io);
+	i_stream_close(conn->input);
+	o_stream_close(conn->output);
+	if (close(conn->fd) < 0)
+		i_error("close(notify connection) failed: %m");
+	conn->fd = -1;
+
+	notify_connection_unref(&conn);
+	master_service_client_connection_destroyed(master_service);
+}
+
+void notify_connection_ref(struct notify_connection *conn)
+{
+	i_assert(conn->refcount > 0);
+
+	conn->refcount++;
+}
+
+void notify_connection_unref(struct notify_connection **_conn)
+{
+	struct notify_connection *conn = *_conn;
+
+	i_assert(conn->refcount > 0);
+
+	*_conn = NULL;
+	if (--conn->refcount > 0)
+		return;
+
+	notify_connection_destroy(conn);
+	i_stream_unref(&conn->input);
+	o_stream_unref(&conn->output);
+	i_free(conn);
+}
+
+void notify_connections_destroy_all(void)
+{
+	while (connections != NULL)
+		notify_connection_destroy(connections);
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/notify-connection.h	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,13 @@
+#ifndef NOTIFY_CONNECTION_H
+#define NOTIFY_CONNECTION_H
+
+struct replicator_queue;
+
+struct notify_connection *
+notify_connection_create(int fd, struct replicator_queue *queue);
+void notify_connection_ref(struct notify_connection *conn);
+void notify_connection_unref(struct notify_connection **conn);
+
+void notify_connections_destroy_all(void);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/replicator-brain.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,164 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "ioloop.h"
+#include "doveadm-connection.h"
+#include "replicator-settings.h"
+#include "replicator-queue.h"
+#include "replicator-brain.h"
+
+struct replicator_sync_context {
+	struct replicator_brain *brain;
+	struct replicator_user *user;
+};
+
+struct replicator_brain {
+	pool_t pool;
+	struct replicator_queue *queue;
+	const struct replicator_settings *set;
+	struct timeout *to;
+
+	ARRAY_DEFINE(doveadm_conns, struct doveadm_connection *);
+};
+
+static void replicator_brain_fill(struct replicator_brain *brain);
+
+static void replicator_brain_queue_changed(void *context)
+{
+	struct replicator_brain *brain = context;
+
+	replicator_brain_fill(brain);
+}
+
+struct replicator_brain *
+replicator_brain_init(struct replicator_queue *queue,
+		      const struct replicator_settings *set)
+{
+	struct replicator_brain *brain;
+	pool_t pool;
+
+	pool = pool_alloconly_create("replication brain", 1024);
+	brain = p_new(pool, struct replicator_brain, 1);
+	brain->pool = pool;
+	brain->queue = queue;
+	brain->set = set;
+	p_array_init(&brain->doveadm_conns, pool, 16);
+	replicator_queue_set_change_callback(queue,
+		replicator_brain_queue_changed, brain);
+	replicator_brain_fill(brain);
+	return brain;
+}
+
+void replicator_brain_deinit(struct replicator_brain **_brain)
+{
+	struct replicator_brain *brain = *_brain;
+	struct doveadm_connection **connp;
+
+	*_brain = NULL;
+
+	array_foreach_modifiable(&brain->doveadm_conns, connp)
+		doveadm_connection_deinit(connp);
+	if (brain->to != NULL)
+		timeout_remove(&brain->to);
+	pool_unref(&brain->pool);
+}
+
+static struct doveadm_connection *
+get_doveadm_connection(struct replicator_brain *brain)
+{
+	struct doveadm_connection *const *connp, *conn = NULL;
+
+	array_foreach(&brain->doveadm_conns, connp) {
+		if (!doveadm_connection_is_busy(*connp))
+			return *connp;
+	}
+	if (array_count(&brain->doveadm_conns) ==
+	    brain->set->replication_max_conns)
+		return NULL;
+
+	conn = doveadm_connection_init(brain->set->doveadm_socket_path);
+	array_append(&brain->doveadm_conns, &conn, 1);
+	return conn;
+}
+
+static void doveadm_sync_callback(enum doveadm_reply reply, void *context)
+{
+	struct replicator_sync_context *ctx = context;
+
+	if (reply == DOVEADM_REPLY_NOUSER) {
+		/* user no longer exists, remove from replication */
+		replicator_queue_remove(ctx->brain->queue, &ctx->user);
+	} else {
+		ctx->user->last_sync_failed =
+			reply != DOVEADM_REPLY_OK;
+		replicator_queue_push(ctx->brain->queue, ctx->user);
+	}
+	replicator_brain_fill(ctx->brain);
+	i_free(ctx);
+}
+
+static bool
+doveadm_replicate(struct replicator_brain *brain, struct replicator_user *user)
+{
+	struct replicator_sync_context *ctx;
+	struct doveadm_connection *conn;
+	bool full;
+
+	conn = get_doveadm_connection(brain);
+	if (conn == NULL)
+		return FALSE;
+
+	full = user->last_full_sync +
+		brain->set->replication_full_sync_interval < ioloop_time;
+	/* update the sync times immediately. if the replication fails we still
+	   wouldn't want it to be retried immediately. */
+	user->last_fast_sync = ioloop_time;
+	if (full)
+		user->last_full_sync = ioloop_time;
+	/* reset priority also. if more updates arrive during replication
+	   we'll do another replication to make sure nothing gets lost */
+	user->priority = REPLICATION_PRIORITY_NONE;
+
+	ctx = i_new(struct replicator_sync_context, 1);
+	ctx->brain = brain;
+	ctx->user = user;
+	doveadm_connection_sync(conn, user->username, full,
+				doveadm_sync_callback, ctx);
+	return TRUE;
+}
+
+static void replicator_brain_timeout(struct replicator_brain *brain)
+{
+	timeout_remove(&brain->to);
+	replicator_brain_fill(brain);
+}
+
+static bool replicator_brain_fill_next(struct replicator_brain *brain)
+{
+	struct replicator_user *user;
+	unsigned int next_secs;
+
+	user = replicator_queue_pop(brain->queue, &next_secs);
+	if (user == NULL) {
+		/* nothing more to do */
+		if (brain->to != NULL)
+			timeout_remove(&brain->to);
+		brain->to = timeout_add(next_secs * 1000,
+					replicator_brain_timeout, brain);
+		return FALSE;
+	}
+
+	if (!doveadm_replicate(brain, user)) {
+		/* all connections were full, put the user back to queue */
+		replicator_queue_push(brain->queue, user);
+		return FALSE;
+	}
+	/* replication started for the user */
+	return TRUE;
+}
+
+static void replicator_brain_fill(struct replicator_brain *brain)
+{
+	while (replicator_brain_fill_next(brain)) ;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/replicator-brain.h	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,11 @@
+#ifndef REPLICATOR_BRAIN_H
+#define REPLICATOR_BRAIN_H
+
+struct replicator_settings;
+
+struct replicator_brain *
+replicator_brain_init(struct replicator_queue *queue,
+		      const struct replicator_settings *set);
+void replicator_brain_deinit(struct replicator_brain **brain);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/replicator-queue.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,363 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "ioloop.h"
+#include "istream.h"
+#include "ostream.h"
+#include "str.h"
+#include "strescape.h"
+#include "hash.h"
+#include "replicator-queue.h"
+
+#include <unistd.h>
+#include <fcntl.h>
+
+struct replicator_sync_lookup {
+	struct replicator_user *user;
+
+	replicator_sync_callback_t *callback;
+	void *context;
+
+	bool wait_for_next_push;
+};
+
+struct replicator_queue {
+	struct priorityq *user_queue;
+	/* username => struct replicator_user* */
+	struct hash_table *user_hash;
+
+	ARRAY_DEFINE(sync_lookups, struct replicator_sync_lookup);
+
+	unsigned int full_sync_interval;
+
+	void (*change_callback)(void *context);
+	void *change_context;
+};
+
+static int user_priority_cmp(const void *p1, const void *p2)
+{
+	const struct replicator_user *user1 = p1, *user2 = p2;
+
+	if (user1->priority > user2->priority)
+		return -1;
+	if (user1->priority < user2->priority)
+		return 1;
+
+	if (user1->priority != REPLICATION_PRIORITY_NONE) {
+		/* there is something to replicate */
+		if (user1->last_fast_sync < user2->last_fast_sync)
+			return -1;
+		if (user1->last_fast_sync > user2->last_fast_sync)
+			return 1;
+	} else {
+		/* nothing to replicate, but do still periodic full syncs */
+		if (user1->last_full_sync < user2->last_full_sync)
+			return -1;
+		if (user1->last_full_sync > user2->last_full_sync)
+			return 1;
+	}
+	return 0;
+}
+
+struct replicator_queue *replicator_queue_init(unsigned int full_sync_interval)
+{
+	struct replicator_queue *queue;
+
+	queue = i_new(struct replicator_queue, 1);
+	queue->full_sync_interval = full_sync_interval;
+	queue->user_queue = priorityq_init(user_priority_cmp, 1024);
+	queue->user_hash =
+		hash_table_create(default_pool, default_pool, 1024,
+				  str_hash, (hash_cmp_callback_t *)strcmp);
+	i_array_init(&queue->sync_lookups, 32);
+	return queue;
+}
+
+void replicator_queue_deinit(struct replicator_queue **_queue)
+{
+	struct replicator_queue *queue = *_queue;
+	struct priorityq_item *item;
+
+	*_queue = NULL;
+
+	while ((item = priorityq_pop(queue->user_queue)) != NULL) {
+		struct replicator_user *user = (struct replicator_user *)item;
+		replicator_queue_remove(queue, &user);
+	}
+
+	priorityq_deinit(&queue->user_queue);
+	hash_table_destroy(&queue->user_hash);
+	i_assert(array_count(&queue->sync_lookups) == 0);
+	array_free(&queue->sync_lookups);
+	i_free(queue);
+}
+
+void replicator_queue_set_change_callback(struct replicator_queue *queue,
+					  void (*callback)(void *context),
+					  void *context)
+{
+	queue->change_callback = callback;
+	queue->change_context = context;
+}
+
+static struct replicator_user *
+replicator_queue_add_int(struct replicator_queue *queue, const char *username,
+			 enum replication_priority priority)
+{
+	struct replicator_user *user;
+
+	user = hash_table_lookup(queue->user_hash, username);
+	if (user == NULL) {
+		user = i_new(struct replicator_user, 1);
+		user->username = i_strdup(username);
+	} else {
+		if (user->priority > priority) {
+			/* user already has a higher priority than this */
+			return user;
+		}
+		if (!user->popped)
+			priorityq_remove(queue->user_queue, &user->item);
+	}
+	user->priority = priority;
+	user->last_update = ioloop_time;
+
+	if (!user->popped)
+		priorityq_add(queue->user_queue, &user->item);
+	return user;
+}
+
+struct replicator_user *
+replicator_queue_add(struct replicator_queue *queue, const char *username,
+		     enum replication_priority priority)
+{
+	struct replicator_user *user;
+
+	user = replicator_queue_add_int(queue, username, priority);
+	if (queue->change_callback != NULL)
+		queue->change_callback(queue->change_context);
+	return user;
+}
+
+void replicator_queue_add_sync(struct replicator_queue *queue,
+			       const char *username,
+			       replicator_sync_callback_t *callback,
+			       void *context)
+{
+	struct replicator_user *user;
+	struct replicator_sync_lookup *lookup;
+
+	user = replicator_queue_add_int(queue, username,
+					REPLICATION_PRIORITY_SYNC);
+
+	lookup = array_append_space(&queue->sync_lookups);
+	lookup->user = user;
+	lookup->callback = callback;
+	lookup->context = context;
+	lookup->wait_for_next_push = user->popped;
+
+	if (queue->change_callback != NULL)
+		queue->change_callback(queue->change_context);
+}
+
+void replicator_queue_remove(struct replicator_queue *queue,
+			     struct replicator_user **_user)
+{
+	struct replicator_user *user = *_user;
+
+	*_user = NULL;
+	if (!user->popped)
+		priorityq_remove(queue->user_queue, &user->item);
+	hash_table_remove(queue->user_hash, user->username);
+
+	i_free(user->username);
+	i_free(user);
+
+	if (queue->change_callback != NULL)
+		queue->change_callback(queue->change_context);
+}
+
+struct replicator_user *
+replicator_queue_pop(struct replicator_queue *queue,
+		     unsigned int *next_secs_r)
+{
+	struct priorityq_item *item;
+	struct replicator_user *user;
+
+	item = priorityq_peek(queue->user_queue);
+	if (item == NULL) {
+		/* no users defined. we shouldn't normally get here */
+		*next_secs_r = 3600;
+		return NULL;
+	}
+	user = (struct replicator_user *)item;
+
+	if (user->priority == REPLICATION_PRIORITY_NONE &&
+	    user->last_full_sync + queue->full_sync_interval > ioloop_time) {
+		/* we don't want to do a full sync yet */
+		*next_secs_r = user->last_full_sync +
+			queue->full_sync_interval - ioloop_time;
+		return NULL;
+	}
+	priorityq_remove(queue->user_queue, &user->item);
+	user->popped = TRUE;
+	return user;
+}
+
+static void
+replicator_queue_handle_sync_lookups(struct replicator_queue *queue,
+				     struct replicator_user *user)
+{
+	struct replicator_sync_lookup *lookups;
+	ARRAY_DEFINE(callbacks, struct replicator_sync_lookup);
+	unsigned int i, count;
+	bool success = !user->last_sync_failed;
+
+	t_array_init(&callbacks, 8);
+	lookups = array_get_modifiable(&queue->sync_lookups, &count);
+	for (i = 0; i < count; ) {
+		if (lookups[i].user != user)
+			i++;
+		else if (lookups[i].wait_for_next_push) {
+			/* another sync request came while user was being
+			   replicated */
+			i_assert(user->priority == REPLICATION_PRIORITY_SYNC);
+			lookups[i].wait_for_next_push = FALSE;
+			i++;
+		} else {
+			array_append(&callbacks, &lookups[i], 1);
+			array_delete(&queue->sync_lookups, i, 1);
+		}
+	}
+
+	array_foreach_modifiable(&callbacks, lookups)
+		lookups->callback(success, lookups->context);
+}
+
+void replicator_queue_push(struct replicator_queue *queue,
+			   struct replicator_user *user)
+{
+	priorityq_add(queue->user_queue, &user->item);
+	user->popped = FALSE;
+
+	T_BEGIN {
+		replicator_queue_handle_sync_lookups(queue, user);
+	} T_END;
+}
+
+static int
+replicator_queue_import_line(struct replicator_queue *queue, const char *line)
+{
+	const char *const *args, *username;
+	unsigned int priority;
+	struct replicator_user *user, tmp_user;
+
+	/* <user> <priority> <last update> <last fast sync> <last full sync> */
+	args = t_strsplit_tabescaped(line);
+	if (str_array_length(args) < 5)
+		return -1;
+
+	memset(&tmp_user, 0, sizeof(tmp_user));
+	username = args[0];
+	if (username[0] == '\0' ||
+	    str_to_uint(args[1], &priority) < 0 ||
+	    str_to_time(args[2], &tmp_user.last_update) < 0 ||
+	    str_to_time(args[3], &tmp_user.last_fast_sync) < 0 ||
+	    str_to_time(args[3], &tmp_user.last_full_sync) < 0)
+		return -1;
+	tmp_user.priority = priority;
+
+	user = hash_table_lookup(queue->user_hash, username);
+	if (user != NULL) {
+		if (user->last_update > tmp_user.last_update) {
+			/* we already have a newer state */
+			return 0;
+		}
+		if (user->last_update == tmp_user.last_update) {
+			/* either one of these could be newer. use the one
+			   with higher priority. */
+			if (user->priority > tmp_user.priority)
+				return 0;
+		}
+	}
+	user = replicator_queue_add(queue, tmp_user.username,
+				    tmp_user.priority);
+	user->last_update = tmp_user.last_update;
+	user->last_fast_sync = tmp_user.last_fast_sync;
+	user->last_full_sync = tmp_user.last_full_sync;
+	return 0;
+}
+
+int replicator_queue_import(struct replicator_queue *queue, const char *path)
+{
+	struct istream *input;
+	const char *line;
+	int fd, ret = 0;
+
+	fd = open(path, O_RDONLY);
+	if (fd == -1) {
+		if (errno == ENOENT)
+			return 0;
+		i_error("open(%s) failed: %m", path);
+		return -1;
+	}
+
+	input = i_stream_create_fd(fd, (size_t)-1, TRUE);
+	while ((line = i_stream_read_next_line(input)) != NULL) {
+		T_BEGIN {
+			ret = replicator_queue_import_line(queue, line);
+		} T_END;
+		if (ret < 0) {
+			i_error("Invalid replicator db record: %s", line);
+			break;
+		}
+	}
+	if (input->stream_errno != 0)
+		ret = -1;
+	i_stream_destroy(&input);
+	return ret;
+}
+
+static void
+replicator_queue_export_user(struct replicator_user *user, string_t *str)
+{
+	str_tabescape_write(str, user->username);
+	str_printfa(str, "\t%d\t%lld\t%lld\t%lld", (int)user->priority,
+		    (long long)user->last_update,
+		    (long long)user->last_fast_sync,
+		    (long long)user->last_full_sync);
+}
+
+int replicator_queue_export(struct replicator_queue *queue, const char *path)
+{
+	struct ostream *output;
+	struct priorityq_item *const *items;
+	unsigned int i, count;
+	string_t *str;
+	int fd, ret;
+
+	fd = creat(path, 0600);
+	if (fd == -1) {
+		i_error("creat(%s) failed: %m", path);
+		return -1;
+	}
+	output = o_stream_create_fd_file(fd, 0, TRUE);
+	o_stream_cork(output);
+
+	str = t_str_new(128);
+	items = priorityq_items(queue->user_queue);
+	count = priorityq_count(queue->user_queue);
+	for (i = 0; i < count; i++) {
+		struct replicator_user *user =
+			(struct replicator_user *)items[i];
+
+		str_truncate(str, 0);
+		replicator_queue_export_user(user, str);
+		if (o_stream_send(output, str_data(str), str_len(str)) < 0)
+			break;
+	}
+
+	ret = output->last_failed_errno != 0 ? -1 : 0;
+	o_stream_destroy(&output);
+	return ret;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/replicator-queue.h	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,60 @@
+#ifndef REPLICATOR_QUEUE_H
+#define REPLICATOR_QUEUE_H
+
+#include "priorityq.h"
+#include "replication-common.h"
+
+struct replicator_user {
+	struct priorityq_item item;
+
+	char *username;
+	enum replication_priority priority;
+	/* last time this user's state was updated */
+	time_t last_update;
+	/* last_fast_run is always >= last_full_run. */
+	time_t last_fast_sync, last_full_sync;
+
+	/* User isn't currently in replication queue */
+	unsigned int popped:1;
+	/* Last replication sync failed */
+	unsigned int last_sync_failed:1;
+};
+
+typedef void replicator_sync_callback_t(bool success, void *context);
+
+struct replicator_queue *replicator_queue_init(unsigned int full_sync_interval);
+void replicator_queue_deinit(struct replicator_queue **queue);
+
+/* Call the specified callback when data is added/removed/moved in queue
+   via _add(), _add_sync() or _remove() functions (not push/pop). */
+void replicator_queue_set_change_callback(struct replicator_queue *queue,
+					  void (*callback)(void *context),
+					  void *context);
+
+/* Add a user to queue and return it. If the user already exists, it's updated
+   only if the new priority is higher. */
+struct replicator_user *
+replicator_queue_add(struct replicator_queue *queue, const char *username,
+		     enum replication_priority priority);
+void replicator_queue_add_sync(struct replicator_queue *queue,
+			       const char *username,
+			       replicator_sync_callback_t *callback,
+			       void *context);
+/* Remove user from replication queue and free it. */
+void replicator_queue_remove(struct replicator_queue *queue,
+			     struct replicator_user **user);
+
+/* Return the next user from replication queue, and remove it from the queue.
+   If there's nothing to be replicated currently, returns NULL and sets
+   next_secs_r to when there should be more work to do. */
+struct replicator_user *
+replicator_queue_pop(struct replicator_queue *queue,
+		     unsigned int *next_secs_r);
+/* Add user back to queue. */
+void replicator_queue_push(struct replicator_queue *queue,
+			   struct replicator_user *user);
+
+int replicator_queue_import(struct replicator_queue *queue, const char *path);
+int replicator_queue_export(struct replicator_queue *queue, const char *path);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/replicator-settings.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,80 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "buffer.h"
+#include "settings-parser.h"
+#include "service-settings.h"
+#include "replicator-settings.h"
+
+/* <settings checks> */
+static struct file_listener_settings replicator_unix_listeners_array[] = {
+	{ "replicator", 0600, "$default_internal_user", "" }
+};
+static struct file_listener_settings *replicator_unix_listeners[] = {
+	&replicator_unix_listeners_array[0]
+};
+static buffer_t replicator_unix_listeners_buf = {
+	replicator_unix_listeners, sizeof(replicator_unix_listeners), { 0, }
+};
+/* </settings checks> */
+
+struct service_settings replicator_service_settings = {
+	.name = "replicator",
+	.protocol = "",
+	.type = "",
+	.executable = "replicator",
+	.user = "",
+	.group = "",
+	.privileged_group = "",
+	.extra_groups = "",
+	.chroot = "",
+
+	.drop_priv_before_exec = FALSE,
+
+	.process_min_avail = 0,
+	.process_limit = 1,
+	.client_limit = 0,
+	.service_count = 0,
+	.idle_kill = -1U,
+	.vsz_limit = (uoff_t)-1,
+
+	.unix_listeners = { { &replicator_unix_listeners_buf,
+			      sizeof(replicator_unix_listeners[0]) } },
+	.fifo_listeners = ARRAY_INIT,
+	.inet_listeners = ARRAY_INIT
+};
+
+#undef DEF
+#define DEF(type, name) \
+	{ type, #name, offsetof(struct replicator_settings, name), NULL }
+
+static const struct setting_define replicator_setting_defines[] = {
+	DEF(SET_STR, auth_socket_path),
+	DEF(SET_STR, doveadm_socket_path),
+
+	DEF(SET_TIME, replication_full_sync_interval),
+	DEF(SET_TIME, replication_max_conns),
+
+	SETTING_DEFINE_LIST_END
+};
+
+const struct replicator_settings replicator_default_settings = {
+	.auth_socket_path = "auth-userdb",
+	.doveadm_socket_path = "doveadm-server",
+
+	.replication_full_sync_interval = 60*60*12,
+	.replication_max_conns = 10
+};
+
+const struct setting_parser_info replicator_setting_parser_info = {
+	.module_name = "replicator",
+	.defines = replicator_setting_defines,
+	.defaults = &replicator_default_settings,
+
+	.type_offset = (size_t)-1,
+	.struct_size = sizeof(struct replicator_settings),
+
+	.parent_offset = (size_t)-1
+};
+
+const struct replicator_settings *replicator_settings;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/replicator-settings.h	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,15 @@
+#ifndef REPLICATOR_SETTINGS_H
+#define REPLICATOR_SETTINGS_H
+
+struct replicator_settings {
+	const char *auth_socket_path;
+	const char *doveadm_socket_path;
+
+	unsigned int replication_full_sync_interval;
+	unsigned int replication_max_conns;
+};
+
+extern const struct setting_parser_info replicator_setting_parser_info;
+extern const struct replicator_settings *replicator_settings;
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/replication/replicator/replicator.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,117 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "restrict-access.h"
+#include "auth-master.h"
+#include "master-service.h"
+#include "master-service-settings.h"
+#include "notify-connection.h"
+#include "replicator-brain.h"
+#include "replicator-queue.h"
+#include "replicator-settings.h"
+
+#define REPLICATOR_AUTH_SERVICE_NAME "replicator"
+#define REPLICATOR_DB_DUMP_INTERVAL_MSECS (1000*60*15)
+#define REPLICATOR_DB_PATH PKG_STATEDIR"/replicator.db"
+
+static struct replicator_queue *queue;
+static struct replicator_brain *brain;
+static const struct replicator_settings *set;
+static struct timeout *to_dump;
+
+static void client_connected(struct master_service_connection *conn)
+{
+	master_service_client_connection_accept(conn);
+	(void)notify_connection_create(conn->fd, queue);
+}
+
+static void replication_add_users(struct replicator_queue *queue)
+{
+	struct auth_master_connection *auth_conn;
+	struct auth_master_user_list_ctx *ctx;
+	struct auth_user_info user_info;
+	struct replicator_user *user;
+	const char *username;
+
+	auth_conn = auth_master_init(set->auth_socket_path,
+				     AUTH_MASTER_FLAG_NO_IDLE_TIMEOUT);
+
+	memset(&user_info, 0, sizeof(user_info));
+	user_info.service = REPLICATOR_AUTH_SERVICE_NAME;
+
+	/* add all users into replication queue, so that we can start doing
+	   full syncs for everyone whose state can't be found */
+	ctx = auth_master_user_list_init(auth_conn, NULL, &user_info);
+	while ((username = auth_master_user_list_next(ctx)) != NULL) {
+		user = replicator_queue_add(queue, username,
+					    REPLICATION_PRIORITY_NONE);
+		user->last_update = 0;
+	}
+	if (auth_master_user_list_deinit(&ctx) < 0)
+		i_error("listing users failed, can't replicate existing data");
+	auth_master_deinit(&auth_conn);
+
+	/* add updates from replicator db, if it exists */
+	(void)replicator_queue_import(queue, REPLICATOR_DB_PATH);
+}
+
+static void replicator_dump_timeout(void *context ATTR_UNUSED)
+{
+	(void)replicator_queue_export(queue, REPLICATOR_DB_PATH);
+}
+
+static void main_init(void)
+{
+	void **sets;
+
+	sets = master_service_settings_get_others(master_service);
+	set = sets[0];
+
+	queue = replicator_queue_init(set->replication_full_sync_interval);
+	replication_add_users(queue);
+	to_dump = timeout_add(REPLICATOR_DB_DUMP_INTERVAL_MSECS,
+			      replicator_dump_timeout, NULL);
+	brain = replicator_brain_init(queue, set);
+}
+
+static void main_deinit(void)
+{
+	notify_connections_destroy_all();
+	replicator_brain_deinit(&brain);
+	timeout_remove(&to_dump);
+	(void)replicator_queue_export(queue, REPLICATOR_DB_PATH);
+	replicator_queue_deinit(&queue);
+}
+
+int main(int argc, char *argv[])
+{
+	const struct setting_parser_info *set_roots[] = {
+		&replicator_setting_parser_info,
+		NULL
+	};
+	const enum master_service_flags service_flags =
+		MASTER_SERVICE_FLAG_NO_IDLE_DIE;
+	const char *error;
+
+	master_service = master_service_init("replicator", service_flags,
+					     &argc, &argv, NULL);
+	if (master_getopt(master_service) > 0)
+		return FATAL_DEFAULT;
+
+	if (master_service_settings_read_simple(master_service, set_roots,
+						&error) < 0)
+		i_fatal("Error reading configuration: %s", error);
+	master_service_init_log(master_service, "replicator: ");
+
+	restrict_access_by_env(NULL, FALSE);
+	restrict_access_allow_coredumps(TRUE);
+	master_service_init_finish(master_service);
+
+	main_init();
+	master_service_run(master_service, client_connected);
+	main_deinit();
+
+	master_service_deinit(&master_service);
+        return 0;
+}