Mercurial > dovecot > core-2.2
changeset 14261:14ff849dc266
Initial implementation of dsync-based replication.
line wrap: on
line diff
--- a/.hgignore Sun Mar 04 09:39:45 2012 +0200 +++ b/.hgignore Sun Mar 04 09:50:21 2012 +0200 @@ -85,6 +85,8 @@ src/plugins/fts-squat/squat-test src/pop3-login/pop3-login src/pop3/pop3 +src/replication/replicator/replicator +src/replication/aggregator/aggregator src/util/gdbhelper src/util/listview src/util/maildirlock
--- a/configure.in Sun Mar 04 09:39:45 2012 +0200 +++ b/configure.in Sun Mar 04 09:50:21 2012 +0200 @@ -2785,6 +2785,9 @@ src/master/Makefile src/pop3/Makefile src/pop3-login/Makefile +src/replication/Makefile +src/replication/aggregator/Makefile +src/replication/replicator/Makefile src/ssl-params/Makefile src/stats/Makefile src/util/Makefile @@ -2803,6 +2806,7 @@ src/plugins/notify/Makefile src/plugins/quota/Makefile src/plugins/imap-quota/Makefile +src/plugins/replication/Makefile src/plugins/snarf/Makefile src/plugins/stats/Makefile src/plugins/imap-stats/Makefile
--- a/src/Makefile.am Sun Mar 04 09:39:45 2012 +0200 +++ b/src/Makefile.am Sun Mar 04 09:50:21 2012 +0200 @@ -39,6 +39,7 @@ log \ config \ director \ + replication \ util \ doveadm \ ssl-params \
--- a/src/doveadm/dsync/doveadm-dsync.c Sun Mar 04 09:39:45 2012 +0200 +++ b/src/doveadm/dsync/doveadm-dsync.c Sun Mar 04 09:50:21 2012 +0200 @@ -368,6 +368,7 @@ int lock_fd, ret = 0; user->admin = TRUE; + user->dsyncing = TRUE; /* create workers */ worker1 = dsync_worker_init_local(user, ctx->namespace_prefix,
--- a/src/lib-storage/mail-user.h Sun Mar 04 09:39:45 2012 +0200 +++ b/src/lib-storage/mail-user.h Sun Mar 04 09:50:21 2012 +0200 @@ -56,6 +56,8 @@ unsigned int inbox_open_error_logged:1; /* Fuzzy search works for this user (FTS enabled) */ unsigned int fuzzy_search:1; + /* We're running dsync */ + unsigned int dsyncing:1; }; struct mail_user_module_register {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/replication/Makefile.am Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,25 @@ +AM_CPPFLAGS = \ + -I$(top_srcdir)/src/lib \ + -I$(top_srcdir)/src/lib-mail \ + -I$(top_srcdir)/src/lib-imap \ + -I$(top_srcdir)/src/lib-index \ + -I$(top_srcdir)/src/lib-storage \ + -I$(top_srcdir)/src/replication \ + -I$(top_srcdir)/src/plugins/notify + +NOPLUGIN_LDFLAGS = +lib20_replication_plugin_la_LDFLAGS = -module -avoid-version + +module_LTLIBRARIES = \ + lib20_replication_plugin.la + +if DOVECOT_PLUGIN_DEPS +lib20_replication_plugin_la_LIBADD = \ + ../notify/lib15_notify_plugin.la +endif + +lib20_replication_plugin_la_SOURCES = \ + replication-plugin.c + +noinst_HEADERS = \ + replication-plugin.h
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/replication/replication-plugin.c Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,353 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "str.h" +#include "strescape.h" +#include "fd-set-nonblock.h" +#include "ioloop.h" +#include "network.h" +#include "write-full.h" +#include "mail-user.h" +#include "mail-storage-private.h" +#include "notify-plugin.h" +#include "replication-common.h" +#include "replication-plugin.h" + +#include <stdlib.h> + +#define REPLICATION_SOCKET_NAME "replication-notify" +#define REPLICATION_FIFO_NAME "replication-notify-fifo" +#define REPLICATION_NOTIFY_DELAY_MSECS 500 +#define REPLICATION_SYNC_TIMEOUT_SECS 10 + +#define REPLICATION_USER_CONTEXT(obj) \ + MODULE_CONTEXT(obj, replication_user_module) + +struct replication_user { + union mail_user_module_context module_ctx; + + const char *fifo_path; + const char *socket_path; + + int fifo_fd; + + struct timeout *to; + enum replication_priority priority; + unsigned int sync_secs; + + bool fifo_failed; +}; + +struct replication_mail_txn_context { + struct mail_user *user; + bool new_messages; +}; + +static MODULE_CONTEXT_DEFINE_INIT(replication_user_module, + &mail_user_module_register); + +static int +replication_fifo_notify(struct mail_user *user, + enum replication_priority priority) +{ + struct replication_user *ruser = REPLICATION_USER_CONTEXT(user); + string_t *str; + ssize_t ret; + + if (ruser->fifo_failed) + return -1; + if (ruser->fifo_fd == -1) { + ruser->fifo_fd = open(ruser->fifo_path, O_WRONLY); + if (ruser->fifo_fd == -1) { + i_error("open(%s) failed: %m", ruser->fifo_path); + ruser->fifo_failed = TRUE; + return -1; + } + fd_set_nonblock(ruser->fifo_fd, TRUE); + } + /* <username> \t <priority> */ + str = t_str_new(256); + str_tabescape_write(str, user->username); + str_append_c(str, '\t'); + switch (priority) { + case REPLICATION_PRIORITY_NONE: + case REPLICATION_PRIORITY_SYNC: + i_unreached(); + case REPLICATION_PRIORITY_LOW: + str_append(str, "low"); + break; + case REPLICATION_PRIORITY_HIGH: + str_append(str, "high"); + break; + } + str_append_c(str, '\n'); + ret = write(ruser->fifo_fd, str_data(str), str_len(str)); + if (ret == 0) { + /* busy, try again later */ + return 0; + } + if (ret != (ssize_t)str_len(str)) { + if (ret < 0) + i_error("write(%s) failed: %m", ruser->fifo_path); + else { + i_error("write(%s) wrote partial data", + ruser->fifo_path); + } + if (close(ruser->fifo_fd) < 0) + i_error("close(%s) failed: %m", ruser->fifo_path); + ruser->fifo_fd = -1; + return -1; + } + return 1; +} + +static void replication_notify_now(struct mail_user *user) +{ + struct replication_user *ruser = REPLICATION_USER_CONTEXT(user); + int ret; + + i_assert(ruser->priority != REPLICATION_PRIORITY_NONE); + i_assert(ruser->priority != REPLICATION_PRIORITY_SYNC); + + if ((ret = replication_fifo_notify(user, ruser->priority)) < 0 && + !ruser->fifo_failed) { + /* retry once, in case replication server was restarted */ + ret = replication_fifo_notify(user, ruser->priority); + } + if (ret != 0) { + timeout_remove(&ruser->to); + ruser->priority = REPLICATION_PRIORITY_NONE; + } +} + +static int replication_notify_sync(struct mail_user *user) +{ + struct replication_user *ruser = REPLICATION_USER_CONTEXT(user); + string_t *str; + char buf[1024]; + int fd; + ssize_t ret; + + fd = net_connect_unix(ruser->socket_path); + if (fd == -1) { + i_error("net_connect_unix(%s) failed: %m", ruser->socket_path); + return -1; + } + net_set_nonblock(fd, FALSE); + + /* <username> \t "sync" */ + str = t_str_new(256); + str_tabescape_write(str, user->username); + str_append(str, "\tsync\n"); + alarm(ruser->sync_secs); + if (write_full(fd, str_data(str), str_len(str)) < 0) { + i_error("write(%s) failed: %m", ruser->socket_path); + ret = -1; + } else { + /* + | - */ + ret = read(fd, buf, sizeof(buf)); + if (ret < 0) { + if (ret != EINTR) { + i_error("read(%s) failed: %m", + ruser->socket_path); + } else { + i_warning("replication(%s): Sync failure: " + "Timeout in %u secs", + user->username, ruser->sync_secs); + } + } else if (ret == 0) { + i_error("read(%s) failed: EOF", ruser->socket_path); + ret = -1; + } else if (buf[0] == '+') { + /* success */ + ret = 0; + } else if (buf[0] == '-') { + /* failure */ + if (buf[ret-1] == '\n') ret--; + i_warning("replication(%s): Sync failure: %s", + user->username, t_strndup(buf+1, ret-1)); + ret = -1; + } else { + i_warning("replication(%s): " + "Remote sent invalid input: %s", + user->username, t_strndup(buf, ret)); + } + } + alarm(0); + if (close(fd) < 0) + i_error("close(%s) failed: %m", ruser->socket_path); + return ret; +} + +static void replication_notify(struct mail_user *user, + enum replication_priority priority) +{ + struct replication_user *ruser = REPLICATION_USER_CONTEXT(user); + + if (user->dsyncing) { + /* we're running dsync, which means that the remote is telling + us about a change. don't trigger a replication back to it */ + return; + } + + if (priority == REPLICATION_PRIORITY_SYNC) { + if (replication_notify_sync(user) == 0) { + timeout_remove(&ruser->to); + ruser->priority = REPLICATION_PRIORITY_NONE; + return; + } + /* sync replication failed, try as "high" via fifo */ + priority = REPLICATION_PRIORITY_HIGH; + } + + if (ruser->priority < priority) + ruser->priority = priority; + if (ruser->to == NULL) { + ruser->to = timeout_add(REPLICATION_NOTIFY_DELAY_MSECS, + replication_notify_now, user); + } +} + +static void * +replication_mail_transaction_begin(struct mailbox_transaction_context *t) +{ + struct replication_mail_txn_context *ctx; + + ctx = i_new(struct replication_mail_txn_context, 1); + ctx->user = t->box->storage->user; + return ctx; +} + +static void replication_mail_save(void *txn, struct mail *mail ATTR_UNUSED) +{ + struct replication_mail_txn_context *ctx = + (struct replication_mail_txn_context *)txn; + + ctx->new_messages = TRUE; +} + +static void replication_mail_copy(void *txn, struct mail *src ATTR_UNUSED, + struct mail *dst ATTR_UNUSED) +{ + struct replication_mail_txn_context *ctx = + (struct replication_mail_txn_context *)txn; + + ctx->new_messages = TRUE; +} + +static void +replication_mail_transaction_commit(void *txn, + struct mail_transaction_commit_changes *changes) +{ + struct replication_mail_txn_context *ctx = + (struct replication_mail_txn_context *)txn; + struct replication_user *ruser = REPLICATION_USER_CONTEXT(ctx->user); + enum replication_priority priority; + + if (ctx->new_messages || changes->changed) { + priority = !ctx->new_messages ? REPLICATION_PRIORITY_LOW : + ruser->sync_secs == 0 ? REPLICATION_PRIORITY_HIGH : + REPLICATION_PRIORITY_SYNC; + replication_notify(ctx->user, priority); + } + i_free(ctx); +} + +static void replication_mailbox_create(struct mailbox *box) +{ + replication_notify(box->storage->user, REPLICATION_PRIORITY_LOW); +} + +static void +replication_mailbox_delete_commit(void *txn ATTR_UNUSED, + struct mailbox *box) +{ + replication_notify(box->storage->user, REPLICATION_PRIORITY_LOW); +} + +static void +replication_mailbox_rename(struct mailbox *src ATTR_UNUSED, + struct mailbox *dest, + bool rename_children ATTR_UNUSED) +{ + replication_notify(dest->storage->user, REPLICATION_PRIORITY_LOW); +} + +static void replication_mailbox_set_subscribed(struct mailbox *box, + bool subscribed ATTR_UNUSED) +{ + replication_notify(box->storage->user, REPLICATION_PRIORITY_LOW); +} + +static void replication_user_deinit(struct mail_user *user) +{ + struct replication_user *ruser = REPLICATION_USER_CONTEXT(user); + + if (ruser->to != NULL) { + replication_notify_now(user); + if (ruser->to != NULL) { + i_warning("%s: Couldn't send final notification " + "due to fifo being busy", ruser->fifo_path); + timeout_remove(&ruser->to); + } + } + + ruser->module_ctx.super.deinit(user); +} + +static void replication_user_created(struct mail_user *user) +{ + struct mail_user_vfuncs *v = user->vlast; + struct replication_user *ruser; + const char *value; + + ruser = p_new(user->pool, struct replication_user, 1); + ruser->module_ctx.super = *v; + user->vlast = &ruser->module_ctx.super; + v->deinit = replication_user_deinit; + MODULE_CONTEXT_SET(user, replication_user_module, ruser); + + ruser->fifo_fd = -1; + ruser->fifo_path = p_strconcat(user->pool, user->set->base_dir, + "/"REPLICATION_FIFO_NAME, NULL); + ruser->socket_path = p_strconcat(user->pool, user->set->base_dir, + "/"REPLICATION_SOCKET_NAME, NULL); + value = mail_user_plugin_getenv(user, "replication_sync_timeout"); + if (value != NULL && str_to_uint(value, &ruser->sync_secs) < 0) { + i_error("replication(%s): " + "Invalid replication_sync_timeout value: %s", + user->username, value); + } +} + +static const struct notify_vfuncs replication_vfuncs = { + .mail_transaction_begin = replication_mail_transaction_begin, + .mail_save = replication_mail_save, + .mail_copy = replication_mail_copy, + .mail_transaction_commit = replication_mail_transaction_commit, + .mailbox_create = replication_mailbox_create, + .mailbox_delete_commit = replication_mailbox_delete_commit, + .mailbox_rename = replication_mailbox_rename, + .mailbox_set_subscribed = replication_mailbox_set_subscribed +}; + +static struct notify_context *replication_ctx; + +static struct mail_storage_hooks replication_mail_storage_hooks = { + .mail_user_created = replication_user_created +}; + +void replication_plugin_init(struct module *module) +{ + replication_ctx = notify_register(&replication_vfuncs); + mail_storage_hooks_add(module, &replication_mail_storage_hooks); +} + +void replication_plugin_deinit(void) +{ + mail_storage_hooks_remove(&replication_mail_storage_hooks); + notify_unregister(replication_ctx); +} + +const char *replication_plugin_dependencies[] = { "notify", NULL };
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/plugins/replication/replication-plugin.h Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,9 @@ +#ifndef REPLICATION_PLUGIN_H +#define REPLICATION_PLUGIN_H + +extern const char *replication_plugin_dependencies[]; + +void replication_plugin_init(struct module *module); +void replication_plugin_deinit(void); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/Makefile.am Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,4 @@ +SUBDIRS = aggregator replicator + +noinst_HEADERS = \ + replication-common.h
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/aggregator/Makefile.am Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,26 @@ +pkglibexecdir = $(libexecdir)/dovecot + +pkglibexec_PROGRAMS = aggregator + +AM_CPPFLAGS = \ + -I$(top_srcdir)/src/lib \ + -I$(top_srcdir)/src/lib-settings \ + -I$(top_srcdir)/src/lib-auth \ + -I$(top_srcdir)/src/lib-master \ + -I$(top_srcdir)/src/replication \ + -DPKG_STATEDIR=\""$(statedir)"\" + +aggregator_LDFLAGS = -export-dynamic +aggregator_LDADD = $(LIBDOVECOT) $(MODULE_LIBS) +aggregator_DEPENDENCIES = $(LIBDOVECOT_DEPS) + +aggregator_SOURCES = \ + aggregator.c \ + aggregator-settings.c \ + notify-connection.c \ + replicator-connection.c + +noinst_HEADERS = \ + aggregator-settings.h \ + notify-connection.h \ + replicator-connection.c
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/aggregator/aggregator-settings.c Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,85 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "buffer.h" +#include "settings-parser.h" +#include "service-settings.h" +#include "aggregator-settings.h" + +/* <settings checks> */ +static struct file_listener_settings aggregator_unix_listeners_array[] = { + { "replication-notify", 0600, "", "" } +}; +static struct file_listener_settings *aggregator_unix_listeners[] = { + &aggregator_unix_listeners_array[0] +}; +static buffer_t aggregator_unix_listeners_buf = { + aggregator_unix_listeners, sizeof(aggregator_unix_listeners), { 0, } +}; + +static struct file_listener_settings aggregator_fifo_listeners_array[] = { + { "replication-notify-fifo", 0600, "", "" } +}; +static struct file_listener_settings *aggregator_fifo_listeners[] = { + &aggregator_fifo_listeners_array[0] +}; +static buffer_t aggregator_fifo_listeners_buf = { + aggregator_fifo_listeners, sizeof(aggregator_fifo_listeners), { 0, } +}; +/* </settings checks> */ + +struct service_settings aggregator_service_settings = { + .name = "aggregator", + .protocol = "", + .type = "", + .executable = "aggregator", + .user = "$default_internal_user", + .group = "", + .privileged_group = "", + .extra_groups = "", + .chroot = ".", + + .drop_priv_before_exec = FALSE, + + .process_min_avail = 0, + .process_limit = 0, + .client_limit = 0, + .service_count = 0, + .idle_kill = 0, + .vsz_limit = (uoff_t)-1, + + .unix_listeners = { { &aggregator_unix_listeners_buf, + sizeof(aggregator_unix_listeners[0]) } }, + .fifo_listeners = { { &aggregator_fifo_listeners_buf, + sizeof(aggregator_fifo_listeners[0]) } }, + .inet_listeners = ARRAY_INIT +}; + +#undef DEF +#define DEF(type, name) \ + { type, #name, offsetof(struct aggregator_settings, name), NULL } + +static const struct setting_define aggregator_setting_defines[] = { + DEF(SET_STR, replicator_host), + DEF(SET_UINT, replicator_port), + + SETTING_DEFINE_LIST_END +}; + +const struct aggregator_settings aggregator_default_settings = { + .replicator_host = "replicator", + .replicator_port = 0 +}; + +const struct setting_parser_info aggregator_setting_parser_info = { + .module_name = "aggregator", + .defines = aggregator_setting_defines, + .defaults = &aggregator_default_settings, + + .type_offset = (size_t)-1, + .struct_size = sizeof(struct aggregator_settings), + + .parent_offset = (size_t)-1 +}; + +const struct aggregator_settings *aggregator_settings;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/aggregator/aggregator-settings.h Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,12 @@ +#ifndef AGGREGATOR_SETTINGS_H +#define AGGREGATOR_SETTINGS_H + +struct aggregator_settings { + const char *replicator_host; + unsigned int replicator_port; +}; + +extern const struct setting_parser_info aggregator_setting_parser_info; +extern const struct aggregator_settings *aggregator_settings; + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/aggregator/aggregator.c Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,75 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "restrict-access.h" +#include "master-service.h" +#include "master-service-settings.h" +#include "aggregator-settings.h" +#include "notify-connection.h" +#include "replicator-connection.h" + +struct replicator_connection *replicator; + +static void client_connected(struct master_service_connection *conn) +{ + master_service_client_connection_accept(conn); + notify_connection_create(conn->fd, conn->fifo); +} + +static void main_preinit(void) +{ + struct ip_addr *ips; + unsigned int ips_count; + const struct aggregator_settings *set; + void **sets; + int ret; + + sets = master_service_settings_get_others(master_service); + set = sets[0]; + + if (set->replicator_port != 0) { + ret = net_gethostbyname(set->replicator_host, &ips, &ips_count); + if (ret != 0) { + i_fatal("replicator_host: gethostbyname(%s) failed: %s", + set->replicator_host, net_gethosterror(ret)); + } + replicator = replicator_connection_create_inet(ips, ips_count, + set->replicator_port, + notify_connection_sync_callback); + } else { + replicator = replicator_connection_create_unix(set->replicator_host, + notify_connection_sync_callback); + } +} + +int main(int argc, char *argv[]) +{ + const struct setting_parser_info *set_roots[] = { + &aggregator_setting_parser_info, + NULL + }; + const char *error; + + master_service = master_service_init("aggregator", 0, + &argc, &argv, NULL); + if (master_getopt(master_service) > 0) + return FATAL_DEFAULT; + + if (master_service_settings_read_simple(master_service, set_roots, + &error) < 0) + i_fatal("Error reading configuration: %s", error); + master_service_init_log(master_service, "aggregator: "); + + main_preinit(); + + restrict_access_by_env(NULL, FALSE); + restrict_access_allow_coredumps(TRUE); + master_service_init_finish(master_service); + + master_service_run(master_service, client_connected); + + notify_connections_destroy_all(); + replicator_connection_destroy(&replicator); + master_service_deinit(&master_service); + return 0; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/aggregator/notify-connection.c Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,154 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "network.h" +#include "istream.h" +#include "ostream.h" +#include "llist.h" +#include "strescape.h" +#include "master-service.h" +#include "replication-common.h" +#include "replicator-connection.h" +#include "notify-connection.h" + +#define MAX_INBUF_SIZE 8192 + +#define CONNECTION_IS_FIFO(conn) \ + ((conn)->output == NULL) + +struct notify_connection { + struct notify_connection *prev, *next; + int refcount; + + int fd; + struct io *io; + struct istream *input; + struct ostream *output; +}; + +static struct notify_connection *conns = NULL; + +static void notify_connection_unref(struct notify_connection *conn); +static void notify_connection_destroy(struct notify_connection *conn); + +static bool notify_input_error(struct notify_connection *conn) +{ + if (CONNECTION_IS_FIFO(conn)) + return TRUE; + notify_connection_destroy(conn); + return FALSE; +} + +void notify_connection_sync_callback(bool success, void *context) +{ + struct notify_connection *conn = context; + + o_stream_send_str(conn->output, success ? "+\n" : "-\n"); + notify_connection_unref(conn); +} + +static int +notify_input_line(struct notify_connection *conn, const char *line) +{ + const char *const *args; + enum replication_priority priority; + + /* <username> \t <priority> */ + args = t_strsplit_tabescaped(line); + if (str_array_length(args) < 2) { + i_error("Client sent invalid input"); + return -1; + } + if (replication_priority_parse(args[1], &priority) < 0) { + i_error("Client sent invalid priority: %s", args[1]); + return -1; + } + if (priority != REPLICATION_PRIORITY_SYNC) + replicator_connection_notify(replicator, args[0], priority); + else { + conn->refcount++; + replicator_connection_notify_sync(replicator, args[0], conn); + } + return 0; +} + +static void notify_input(struct notify_connection *conn) +{ + const char *line; + int ret; + + switch (i_stream_read(conn->input)) { + case -2: + /* buffer full */ + i_error("Client sent too long line"); + notify_input_error(conn); + return; + case -1: + /* disconnected */ + notify_connection_destroy(conn); + return; + } + + while ((line = i_stream_next_line(conn->input)) != NULL) { + T_BEGIN { + ret = notify_input_line(conn, line); + } T_END; + if (ret < 0) { + if (!notify_input_error(conn)) + return; + } + } +} + +void notify_connection_create(int fd, bool fifo) +{ + struct notify_connection *conn; + + conn = i_new(struct notify_connection, 1); + conn->refcount = 1; + conn->fd = fd; + conn->io = io_add(fd, IO_READ, notify_input, conn); + conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE); + if (!fifo) + conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE); + + DLLIST_PREPEND(&conns, conn); +} + +static void notify_connection_unref(struct notify_connection *conn) +{ + i_assert(conn->refcount > 0); + if (--conn->refcount > 0) + return; + + i_stream_destroy(&conn->input); + if (conn->output != NULL) + o_stream_destroy(&conn->output); + i_free(conn); +} + +static void notify_connection_destroy(struct notify_connection *conn) +{ + i_assert(conn->fd != -1); + + if (!CONNECTION_IS_FIFO(conn)) + master_service_client_connection_destroyed(master_service); + + DLLIST_REMOVE(&conns, conn); + + io_remove(&conn->io); + i_stream_close(conn->input); + if (conn->output != NULL) + o_stream_close(conn->output); + net_disconnect(conn->fd); + conn->fd = -1; + + notify_connection_unref(conn); +} + +void notify_connections_destroy_all(void) +{ + while (conns != NULL) + notify_connection_destroy(conns); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/aggregator/notify-connection.h Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,9 @@ +#ifndef NOTIFY_CONNECTION_H +#define NOTIFY_CONNECTION_H + +void notify_connection_create(int fd, bool fifo); +void notify_connections_destroy_all(void); + +void notify_connection_sync_callback(bool success, void *context); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/aggregator/replicator-connection.c Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,321 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "network.h" +#include "istream.h" +#include "ostream.h" +#include "buffer.h" +#include "hash.h" +#include "llist.h" +#include "strescape.h" +#include "replicator-connection.h" + +#define MAX_INBUF_SIZE 1024 +#define REPLICATOR_RECONNECT_MSECS 5000 +#define REPLICATOR_MEMBUF_MAX_SIZE 1024*1024 +#define REPLICATOR_HANDSHAKE "VERSION\treplicator-notify\t1\t0\n" + +struct replicator_connection { + char *path; + struct ip_addr *ips; + unsigned int ips_count, ip_idx, port; + + int fd; + struct io *io; + struct istream *input; + struct ostream *output; + struct timeout *to; + + buffer_t *queue[REPLICATION_PRIORITY_SYNC + 1]; + + struct hash_table *requests; + unsigned int request_id_counter; + replicator_sync_callback_t *callback; +}; + +static void replicator_connection_disconnect(struct replicator_connection *conn); + +static int +replicator_input_line(struct replicator_connection *conn, const char *line) +{ + void *context; + unsigned int id; + + /* <+|-> \t <id> */ + if ((line[0] != '+' && line[0] != '-') || line[1] != '\t' || + str_to_uint(line+2, &id) < 0 || id == 0) { + i_error("Replicator sent invalid input: %s", line); + return -1; + } + + context = hash_table_lookup(conn->requests, POINTER_CAST(id)); + if (context == NULL) { + i_error("Replicator sent invalid ID: %u", id); + return -1; + } + hash_table_remove(conn->requests, context); + conn->callback(line[0] == '+', context); + return 0; +} + +static void replicator_input(struct replicator_connection *conn) +{ + const char *line; + + switch (i_stream_read(conn->input)) { + case -2: + /* buffer full */ + i_error("Replicator sent too long line"); + replicator_connection_disconnect(conn); + return; + case -1: + /* disconnected */ + replicator_connection_disconnect(conn); + return; + } + + while ((line = i_stream_next_line(conn->input)) != NULL) + (void)replicator_input_line(conn, line); +} + +static bool +replicator_send_buf(struct replicator_connection *conn, buffer_t *buf) +{ + const unsigned char *data = buf->data; + unsigned int len = IO_BLOCK_SIZE; + + /* try to send about IO_BLOCK_SIZE amount of data, + but only full lines */ + if (len > buf->used) + len = buf->used; + for (;; len++) { + i_assert(len < buf->used); /* there is always LF */ + if (data[len] == '\n') { + len++; + break; + } + } + + if (o_stream_send(conn->output, data, len) < 0) { + replicator_connection_disconnect(conn); + return FALSE; + } + buffer_delete(buf, 0, len); + return TRUE; +} + +static int replicator_output(struct replicator_connection *conn) +{ + enum replication_priority p; + + if (o_stream_flush(conn->output) < 0) { + replicator_connection_disconnect(conn); + return 1; + } + + for (p = REPLICATION_PRIORITY_SYNC;;) { + if (o_stream_get_buffer_used_size(conn->output) > 0) { + o_stream_set_flush_pending(conn->output, TRUE); + break; + } + /* output buffer is empty, send more data */ + if (conn->queue[p]->used > 0) { + if (!replicator_send_buf(conn, conn->queue[p])) + break; + } else { + if (p == REPLICATION_PRIORITY_LOW) + break; + p--; + } + } + return 1; +} + +static void replicator_connection_connect(struct replicator_connection *conn) +{ + unsigned int n; + int fd = -1; + + if (conn->port == 0) { + fd = net_connect_unix(conn->path); + if (fd == -1) + i_error("net_connect_unix(%s) failed: %m", conn->path); + } else { + for (n = 0; n < conn->ips_count; n++) { + unsigned int idx = conn->ip_idx; + + conn->ip_idx = (conn->ip_idx + 1) % conn->ips_count; + fd = net_connect_ip(&conn->ips[idx], conn->port, NULL); + if (fd != -1) + break; + i_error("connect(%s, %u) failed: %m", + net_ip2addr(&conn->ips[idx]), conn->port); + } + } + + if (fd == -1) { + if (conn->to == NULL) { + conn->to = timeout_add(REPLICATOR_RECONNECT_MSECS, + replicator_connection_connect, + conn); + } + return; + } + + if (conn->to != NULL) + timeout_remove(&conn->to); + conn->fd = fd; + conn->io = io_add(fd, IO_READ, replicator_input, conn); + conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE); + conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE); + (void)o_stream_send_str(conn->output, REPLICATOR_HANDSHAKE); + o_stream_set_flush_callback(conn->output, replicator_output, conn); +} + +static void replicator_abort_all_requests(struct replicator_connection *conn) +{ + struct hash_iterate_context *iter; + void *key, *value; + + iter = hash_table_iterate_init(conn->requests); + while (hash_table_iterate(iter, &key, &value)) + conn->callback(FALSE, value); + hash_table_iterate_deinit(&iter); + hash_table_clear(conn->requests, TRUE); +} + +static void replicator_connection_disconnect(struct replicator_connection *conn) +{ + if (conn->fd == -1) + return; + + replicator_abort_all_requests(conn); + io_remove(&conn->io); + i_stream_destroy(&conn->input); + o_stream_destroy(&conn->output); + net_disconnect(conn->fd); +} + +static struct replicator_connection *replicator_connection_create(void) +{ + struct replicator_connection *conn; + unsigned int i; + + conn = i_new(struct replicator_connection, 1); + conn->fd = -1; + conn->requests = hash_table_create(default_pool, default_pool, + 0, NULL, NULL); + for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++) + conn->queue[i] = buffer_create_dynamic(default_pool, 1024); + return conn; +} + +struct replicator_connection * +replicator_connection_create_unix(const char *path, + replicator_sync_callback_t *callback) +{ + struct replicator_connection *conn; + + conn = replicator_connection_create(); + conn->callback = callback; + conn->path = i_strdup(path); + return conn; +} + +struct replicator_connection * +replicator_connection_create_inet(const struct ip_addr *ips, + unsigned int ips_count, unsigned int port, + replicator_sync_callback_t *callback) +{ + struct replicator_connection *conn; + + conn = replicator_connection_create(); + conn->callback = callback; + conn->ips = i_new(struct ip_addr, ips_count); + memcpy(conn->ips, ips, sizeof(*ips) * ips_count); + conn->ips_count = ips_count; + conn->port = port; + return conn; +} + +void replicator_connection_destroy(struct replicator_connection **_conn) +{ + struct replicator_connection *conn = *_conn; + unsigned int i; + + *_conn = NULL; + replicator_connection_disconnect(conn); + + for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++) + buffer_free(&conn->queue[i]); + + if (conn->to != NULL) + timeout_remove(&conn->to); + hash_table_destroy(&conn->requests); + i_free(conn); +} + +static void +replicator_send(struct replicator_connection *conn, + enum replication_priority priority, const char *data) +{ + unsigned int data_len = strlen(data); + + if (conn->fd != -1 && + o_stream_get_buffer_used_size(conn->output) == 0) { + /* we can send data immediately */ + o_stream_send(conn->output, data, data_len); + } else if (conn->queue[priority]->used + data_len >= + REPLICATOR_MEMBUF_MAX_SIZE) { + /* FIXME: compress duplicates, start writing to file */ + } else { + /* queue internally to separate queues */ + buffer_append(conn->queue[priority], data, data_len); + if (conn->output != NULL) + o_stream_set_flush_pending(conn->output, TRUE); + } +} + +void replicator_connection_notify(struct replicator_connection *conn, + const char *username, + enum replication_priority priority) +{ + const char *priority_str = ""; + + replicator_connection_connect(conn); + + switch (priority) { + case REPLICATION_PRIORITY_NONE: + case REPLICATION_PRIORITY_SYNC: + i_unreached(); + case REPLICATION_PRIORITY_LOW: + priority_str = "low"; + break; + case REPLICATION_PRIORITY_HIGH: + priority_str = "high"; + break; + } + + T_BEGIN { + replicator_send(conn, priority, t_strdup_printf( + "U\t%s\t%s\n", str_tabescape(username), priority_str)); + } T_END; +} + +void replicator_connection_notify_sync(struct replicator_connection *conn, + const char *username, void *context) +{ + unsigned int id; + + replicator_connection_connect(conn); + + id = ++conn->request_id_counter; + if (id == 0) id++; + hash_table_insert(conn->requests, POINTER_CAST(id), context); + + T_BEGIN { + replicator_send(conn, REPLICATION_PRIORITY_SYNC, t_strdup_printf( + "U\t%s\tsync\t%u\n", str_tabescape(username), id)); + } T_END; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/aggregator/replicator-connection.h Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,25 @@ +#ifndef REPLICATOR_CONNECTION_H +#define REPLICATOR_CONNECTION_H + +#include "replication-common.h" + +typedef void replicator_sync_callback_t(bool success, void *context); + +struct replicator_connection * +replicator_connection_create_unix(const char *path, + replicator_sync_callback_t *callback); +struct replicator_connection * +replicator_connection_create_inet(const struct ip_addr *ips, + unsigned int ips_count, unsigned int port, + replicator_sync_callback_t *callback); +void replicator_connection_destroy(struct replicator_connection **conn); + +void replicator_connection_notify(struct replicator_connection *conn, + const char *username, + enum replication_priority priority); +void replicator_connection_notify_sync(struct replicator_connection *conn, + const char *username, void *context); + +extern struct replicator_connection *replicator; + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replication-common.h Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,30 @@ +#ifndef REPLICATION_COMMON_H +#define REPLICATION_COMMON_H + +enum replication_priority { + /* user is fully replicated, as far as we know */ + REPLICATION_PRIORITY_NONE = 0, + /* flag changes, expunges, etc. */ + REPLICATION_PRIORITY_LOW, + /* new emails */ + REPLICATION_PRIORITY_HIGH, + /* synchronously wait for new emails to be replicated */ + REPLICATION_PRIORITY_SYNC +}; + +static inline int +replication_priority_parse(const char *str, + enum replication_priority *priority_r) +{ + if (strcmp(str, "low") == 0) + *priority_r = REPLICATION_PRIORITY_LOW; + else if (strcmp(str, "high") == 0) + *priority_r = REPLICATION_PRIORITY_HIGH; + else if (strcmp(str, "sync") == 0) + *priority_r = REPLICATION_PRIORITY_SYNC; + else + return -1; + return 0; +} + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/Makefile.am Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,30 @@ +pkglibexecdir = $(libexecdir)/dovecot + +pkglibexec_PROGRAMS = replicator + +AM_CPPFLAGS = \ + -I$(top_srcdir)/src/lib \ + -I$(top_srcdir)/src/lib-settings \ + -I$(top_srcdir)/src/lib-auth \ + -I$(top_srcdir)/src/lib-master \ + -I$(top_srcdir)/src/replication \ + -DPKG_STATEDIR=\""$(statedir)"\" + +replicator_LDFLAGS = -export-dynamic +replicator_LDADD = $(LIBDOVECOT) $(MODULE_LIBS) +replicator_DEPENDENCIES = $(LIBDOVECOT_DEPS) + +replicator_SOURCES = \ + doveadm-connection.c \ + replicator.c \ + replicator-brain.c \ + replicator-queue.c \ + replicator-settings.c \ + notify-connection.c + +noinst_HEADERS = \ + doveadm-connection.h \ + replicator-brain.h \ + replicator-queue.h \ + replicator-settings.h \ + notify-connection.h
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/doveadm-connection.c Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,194 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "network.h" +#include "istream.h" +#include "ostream.h" +#include "str.h" +#include "strescape.h" +#include "doveadm-connection.h" + +#include <unistd.h> + +#define DOVEADM_FAIL_TIMEOUT_MSECS (1000*5) +#define DOVEADM_HANDSHAKE "VERSION\tdoveadm-server\t1\t0\n" +#define MAX_INBUF_SIZE 1024 + +struct doveadm_connection { + char *path; + int fd; + struct io *io; + struct istream *input; + struct ostream *output; + struct timeout *to; + + doveadm_callback_t *callback; + void *context; + + time_t last_connect_failure; + unsigned int handshaked:1; + unsigned int end_of_print:1; +}; + +struct doveadm_connection *doveadm_connection_init(const char *path) +{ + struct doveadm_connection *conn; + + conn = i_new(struct doveadm_connection, 1); + conn->path = i_strdup(path); + conn->fd = -1; + return conn; +} + +static void doveadm_callback(struct doveadm_connection *conn, + enum doveadm_reply reply) +{ + doveadm_callback_t *callback = conn->callback; + void *context = conn->context; + + if (conn->to != NULL) + timeout_remove(&conn->to); + + conn->callback = NULL; + conn->context = NULL; + callback(reply, context); +} + +static void doveadm_disconnect(struct doveadm_connection *conn) +{ + if (conn->fd == -1) + return; + + io_remove(&conn->io); + o_stream_destroy(&conn->output); + i_stream_destroy(&conn->input); + if (close(conn->fd) < 0) + i_error("close(doveadm) failed: %m"); + conn->fd = -1; + + if (conn->callback != NULL) + doveadm_callback(conn, DOVEADM_REPLY_FAIL); +} + +void doveadm_connection_deinit(struct doveadm_connection **_conn) +{ + struct doveadm_connection *conn = *_conn; + + *_conn = NULL; + + doveadm_disconnect(conn); + i_free(conn->path); + i_free(conn); +} + +static int doveadm_input_line(struct doveadm_connection *conn, const char *line) +{ + if (!conn->handshaked) { + if (strcmp(line, "+") != 0) { + i_error("%s: Unexpected handshake: %s", + conn->path, line); + return -1; + } + conn->handshaked = TRUE; + return 0; + } + if (conn->callback == NULL) { + i_error("%s: Unexpected input: %s", conn->path, line); + return -1; + } + if (!conn->end_of_print) { + if (line[0] == '\0') + conn->end_of_print = TRUE; + return 0; + } + if (line[0] == '+') + doveadm_callback(conn, DOVEADM_REPLY_OK); + else if (line[0] == '-') { + /* FIXME: handle DOVEADM_REPLY_NOUSER */ + doveadm_callback(conn, DOVEADM_REPLY_FAIL); + } else { + i_error("%s: Invalid input: %s", conn->path, line); + return -1; + } + conn->end_of_print = FALSE; + /* FIXME: disconnect after each request for now. + doveadm server's getopt() handling seems to break otherwise */ + doveadm_disconnect(conn); + return 0; +} + +static void doveadm_input(struct doveadm_connection *conn) +{ + const char *line; + + while ((line = i_stream_read_next_line(conn->input)) != NULL) { + if (doveadm_input_line(conn, line) < 0) { + doveadm_disconnect(conn); + return; + } + } + if (conn->input->eof) + doveadm_disconnect(conn); +} + +static int doveadm_connect(struct doveadm_connection *conn) +{ + if (conn->fd != -1) + return 0; + + if (conn->last_connect_failure == ioloop_time) + return -1; + + conn->fd = net_connect_unix(conn->path); + if (conn->fd == -1) { + i_error("net_connect_unix(%s) failed: %m", conn->path); + conn->last_connect_failure = ioloop_time; + return -1; + } + conn->last_connect_failure = 0; + conn->io = io_add(conn->fd, IO_READ, doveadm_input, conn); + conn->input = i_stream_create_fd(conn->fd, MAX_INBUF_SIZE, FALSE); + conn->output = o_stream_create_fd(conn->fd, (size_t)-1, FALSE); + o_stream_send_str(conn->output, DOVEADM_HANDSHAKE); + return 0; +} + +static void doveadm_fail_timeout(struct doveadm_connection *conn) +{ + doveadm_callback(conn, DOVEADM_REPLY_FAIL); +} + +void doveadm_connection_sync(struct doveadm_connection *conn, + const char *username, bool full, + doveadm_callback_t *callback, void *context) +{ + string_t *cmd; + + i_assert(callback != NULL); + i_assert(!doveadm_connection_is_busy(conn)); + + conn->callback = callback; + conn->context = context; + + if (doveadm_connect(conn) < 0) { + i_assert(conn->to == NULL); + conn->to = timeout_add(DOVEADM_FAIL_TIMEOUT_MSECS, + doveadm_fail_timeout, conn); + } else { + /* <flags> <username> <command> [<args>] */ + cmd = t_str_new(256); + str_append_c(cmd, '\t'); + str_tabescape_write(cmd, username); + str_append(cmd, "\tsync\t-d"); + if (full) + str_append(cmd, "\t-f"); + str_append_c(cmd, '\n'); + o_stream_send(conn->output, str_data(cmd), str_len(cmd)); + } +} + +bool doveadm_connection_is_busy(struct doveadm_connection *conn) +{ + return conn->callback != NULL; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/doveadm-connection.h Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,20 @@ +#ifndef DOVEADM_CONNECTION_H +#define DOVEADM_CONNECTION_H + +enum doveadm_reply { + DOVEADM_REPLY_OK, + DOVEADM_REPLY_FAIL, + DOVEADM_REPLY_NOUSER +}; + +typedef void doveadm_callback_t(enum doveadm_reply reply, void *context); + +struct doveadm_connection *doveadm_connection_init(const char *path); +void doveadm_connection_deinit(struct doveadm_connection **conn); + +void doveadm_connection_sync(struct doveadm_connection *conn, + const char *username, bool full, + doveadm_callback_t *callback, void *context); +bool doveadm_connection_is_busy(struct doveadm_connection *conn); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/notify-connection.c Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,197 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "llist.h" +#include "istream.h" +#include "ostream.h" +#include "strescape.h" +#include "master-service.h" +#include "replicator-queue.h" +#include "notify-connection.h" + +#include <unistd.h> + +#define MAX_INBUF_SIZE (1024*64) +#define NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION 1 +#define NOTIFY_CLIENT_PROTOCOL_MINOR_VERSION 0 + +struct notify_connection { + struct notify_connection *prev, *next; + int refcount; + + int fd; + struct io *io; + struct istream *input; + struct ostream *output; + + struct replicator_queue *queue; + + unsigned int version_received:1; + unsigned int destroyed:1; +}; + +struct notify_sync_request { + struct notify_connection *conn; + unsigned int id; +}; + +static struct notify_connection *connections; + +static void notify_connection_destroy(struct notify_connection *conn); + +static void notify_sync_callback(bool success, void *context) +{ + struct notify_sync_request *request = context; + + o_stream_send_str(request->conn->output, t_strdup_printf( + "%c\t%u\n", success ? '+' : '-', request->id)); + + notify_connection_unref(&request->conn); + i_free(request); +} + +static int +notify_connection_input_line(struct notify_connection *conn, const char *line) +{ + struct notify_sync_request *request; + const char *const *args; + enum replication_priority priority; + unsigned int id; + + /* U \t <username> \t <priority> [\t <sync id>] */ + args = t_strsplit_tabescaped(line); + if (str_array_length(args) < 2) { + i_error("notify client sent invalid input: %s", line); + return -1; + } + if (strcmp(args[0], "U") != 0) { + i_error("notify client sent unknown command: %s", args[0]); + return -1; + } + if (replication_priority_parse(args[2], &priority) < 0) { + i_error("notify client sent invalid priority: %s", args[2]); + return -1; + } + if (priority != REPLICATION_PRIORITY_SYNC) + replicator_queue_add(conn->queue, args[1], priority); + else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) { + i_error("notify client sent invalid sync id: %s", line); + return -1; + } else { + request = i_new(struct notify_sync_request, 1); + request->conn = conn; + request->id = id; + notify_connection_ref(conn); + replicator_queue_add_sync(conn->queue, args[1], + notify_sync_callback, request); + } + return 0; +} + +static void notify_connection_input(struct notify_connection *conn) +{ + const char *line; + int ret; + + switch (i_stream_read(conn->input)) { + case -2: + i_error("BUG: Client connection sent too much data"); + notify_connection_destroy(conn); + return; + case -1: + notify_connection_destroy(conn); + return; + } + + if (!conn->version_received) { + if ((line = i_stream_next_line(conn->input)) == NULL) + return; + + if (!version_string_verify(line, "replicator-notify", + NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION)) { + i_error("Notify client not compatible with this server " + "(mixed old and new binaries?)"); + notify_connection_destroy(conn); + return; + } + conn->version_received = TRUE; + } + + while ((line = i_stream_next_line(conn->input)) != NULL) { + T_BEGIN { + ret = notify_connection_input_line(conn, line); + } T_END; + if (ret < 0) { + notify_connection_destroy(conn); + break; + } + } +} + +struct notify_connection * +notify_connection_create(int fd, struct replicator_queue *queue) +{ + struct notify_connection *conn; + + i_assert(fd >= 0); + + conn = i_new(struct notify_connection, 1); + conn->refcount = 1; + conn->queue = queue; + conn->fd = fd; + conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE); + conn->output = o_stream_create_fd(fd, (size_t)-1, FALSE); + conn->io = io_add(fd, IO_READ, notify_connection_input, conn); + conn->queue = queue; + + DLLIST_PREPEND(&connections, conn); + return conn; +} + +static void notify_connection_destroy(struct notify_connection *conn) +{ + if (conn->destroyed) + return; + conn->destroyed = TRUE; + + DLLIST_REMOVE(&connections, conn); + + io_remove(&conn->io); + i_stream_close(conn->input); + o_stream_close(conn->output); + if (close(conn->fd) < 0) + i_error("close(notify connection) failed: %m"); + conn->fd = -1; + + notify_connection_unref(&conn); + master_service_client_connection_destroyed(master_service); +} + +void notify_connection_ref(struct notify_connection *conn) +{ + i_assert(conn->refcount > 0); + + conn->refcount++; +} + +void notify_connection_unref(struct notify_connection **_conn) +{ + struct notify_connection *conn = *_conn; + + i_assert(conn->refcount > 0); + + *_conn = NULL; + if (--conn->refcount > 0) + return; + + notify_connection_destroy(conn); + i_stream_unref(&conn->input); + o_stream_unref(&conn->output); + i_free(conn); +} + +void notify_connections_destroy_all(void) +{ + while (connections != NULL) + notify_connection_destroy(connections); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/notify-connection.h Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,13 @@ +#ifndef NOTIFY_CONNECTION_H +#define NOTIFY_CONNECTION_H + +struct replicator_queue; + +struct notify_connection * +notify_connection_create(int fd, struct replicator_queue *queue); +void notify_connection_ref(struct notify_connection *conn); +void notify_connection_unref(struct notify_connection **conn); + +void notify_connections_destroy_all(void); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/replicator-brain.c Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,164 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "ioloop.h" +#include "doveadm-connection.h" +#include "replicator-settings.h" +#include "replicator-queue.h" +#include "replicator-brain.h" + +struct replicator_sync_context { + struct replicator_brain *brain; + struct replicator_user *user; +}; + +struct replicator_brain { + pool_t pool; + struct replicator_queue *queue; + const struct replicator_settings *set; + struct timeout *to; + + ARRAY_DEFINE(doveadm_conns, struct doveadm_connection *); +}; + +static void replicator_brain_fill(struct replicator_brain *brain); + +static void replicator_brain_queue_changed(void *context) +{ + struct replicator_brain *brain = context; + + replicator_brain_fill(brain); +} + +struct replicator_brain * +replicator_brain_init(struct replicator_queue *queue, + const struct replicator_settings *set) +{ + struct replicator_brain *brain; + pool_t pool; + + pool = pool_alloconly_create("replication brain", 1024); + brain = p_new(pool, struct replicator_brain, 1); + brain->pool = pool; + brain->queue = queue; + brain->set = set; + p_array_init(&brain->doveadm_conns, pool, 16); + replicator_queue_set_change_callback(queue, + replicator_brain_queue_changed, brain); + replicator_brain_fill(brain); + return brain; +} + +void replicator_brain_deinit(struct replicator_brain **_brain) +{ + struct replicator_brain *brain = *_brain; + struct doveadm_connection **connp; + + *_brain = NULL; + + array_foreach_modifiable(&brain->doveadm_conns, connp) + doveadm_connection_deinit(connp); + if (brain->to != NULL) + timeout_remove(&brain->to); + pool_unref(&brain->pool); +} + +static struct doveadm_connection * +get_doveadm_connection(struct replicator_brain *brain) +{ + struct doveadm_connection *const *connp, *conn = NULL; + + array_foreach(&brain->doveadm_conns, connp) { + if (!doveadm_connection_is_busy(*connp)) + return *connp; + } + if (array_count(&brain->doveadm_conns) == + brain->set->replication_max_conns) + return NULL; + + conn = doveadm_connection_init(brain->set->doveadm_socket_path); + array_append(&brain->doveadm_conns, &conn, 1); + return conn; +} + +static void doveadm_sync_callback(enum doveadm_reply reply, void *context) +{ + struct replicator_sync_context *ctx = context; + + if (reply == DOVEADM_REPLY_NOUSER) { + /* user no longer exists, remove from replication */ + replicator_queue_remove(ctx->brain->queue, &ctx->user); + } else { + ctx->user->last_sync_failed = + reply != DOVEADM_REPLY_OK; + replicator_queue_push(ctx->brain->queue, ctx->user); + } + replicator_brain_fill(ctx->brain); + i_free(ctx); +} + +static bool +doveadm_replicate(struct replicator_brain *brain, struct replicator_user *user) +{ + struct replicator_sync_context *ctx; + struct doveadm_connection *conn; + bool full; + + conn = get_doveadm_connection(brain); + if (conn == NULL) + return FALSE; + + full = user->last_full_sync + + brain->set->replication_full_sync_interval < ioloop_time; + /* update the sync times immediately. if the replication fails we still + wouldn't want it to be retried immediately. */ + user->last_fast_sync = ioloop_time; + if (full) + user->last_full_sync = ioloop_time; + /* reset priority also. if more updates arrive during replication + we'll do another replication to make sure nothing gets lost */ + user->priority = REPLICATION_PRIORITY_NONE; + + ctx = i_new(struct replicator_sync_context, 1); + ctx->brain = brain; + ctx->user = user; + doveadm_connection_sync(conn, user->username, full, + doveadm_sync_callback, ctx); + return TRUE; +} + +static void replicator_brain_timeout(struct replicator_brain *brain) +{ + timeout_remove(&brain->to); + replicator_brain_fill(brain); +} + +static bool replicator_brain_fill_next(struct replicator_brain *brain) +{ + struct replicator_user *user; + unsigned int next_secs; + + user = replicator_queue_pop(brain->queue, &next_secs); + if (user == NULL) { + /* nothing more to do */ + if (brain->to != NULL) + timeout_remove(&brain->to); + brain->to = timeout_add(next_secs * 1000, + replicator_brain_timeout, brain); + return FALSE; + } + + if (!doveadm_replicate(brain, user)) { + /* all connections were full, put the user back to queue */ + replicator_queue_push(brain->queue, user); + return FALSE; + } + /* replication started for the user */ + return TRUE; +} + +static void replicator_brain_fill(struct replicator_brain *brain) +{ + while (replicator_brain_fill_next(brain)) ; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/replicator-brain.h Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,11 @@ +#ifndef REPLICATOR_BRAIN_H +#define REPLICATOR_BRAIN_H + +struct replicator_settings; + +struct replicator_brain * +replicator_brain_init(struct replicator_queue *queue, + const struct replicator_settings *set); +void replicator_brain_deinit(struct replicator_brain **brain); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/replicator-queue.c Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,363 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "ioloop.h" +#include "istream.h" +#include "ostream.h" +#include "str.h" +#include "strescape.h" +#include "hash.h" +#include "replicator-queue.h" + +#include <unistd.h> +#include <fcntl.h> + +struct replicator_sync_lookup { + struct replicator_user *user; + + replicator_sync_callback_t *callback; + void *context; + + bool wait_for_next_push; +}; + +struct replicator_queue { + struct priorityq *user_queue; + /* username => struct replicator_user* */ + struct hash_table *user_hash; + + ARRAY_DEFINE(sync_lookups, struct replicator_sync_lookup); + + unsigned int full_sync_interval; + + void (*change_callback)(void *context); + void *change_context; +}; + +static int user_priority_cmp(const void *p1, const void *p2) +{ + const struct replicator_user *user1 = p1, *user2 = p2; + + if (user1->priority > user2->priority) + return -1; + if (user1->priority < user2->priority) + return 1; + + if (user1->priority != REPLICATION_PRIORITY_NONE) { + /* there is something to replicate */ + if (user1->last_fast_sync < user2->last_fast_sync) + return -1; + if (user1->last_fast_sync > user2->last_fast_sync) + return 1; + } else { + /* nothing to replicate, but do still periodic full syncs */ + if (user1->last_full_sync < user2->last_full_sync) + return -1; + if (user1->last_full_sync > user2->last_full_sync) + return 1; + } + return 0; +} + +struct replicator_queue *replicator_queue_init(unsigned int full_sync_interval) +{ + struct replicator_queue *queue; + + queue = i_new(struct replicator_queue, 1); + queue->full_sync_interval = full_sync_interval; + queue->user_queue = priorityq_init(user_priority_cmp, 1024); + queue->user_hash = + hash_table_create(default_pool, default_pool, 1024, + str_hash, (hash_cmp_callback_t *)strcmp); + i_array_init(&queue->sync_lookups, 32); + return queue; +} + +void replicator_queue_deinit(struct replicator_queue **_queue) +{ + struct replicator_queue *queue = *_queue; + struct priorityq_item *item; + + *_queue = NULL; + + while ((item = priorityq_pop(queue->user_queue)) != NULL) { + struct replicator_user *user = (struct replicator_user *)item; + replicator_queue_remove(queue, &user); + } + + priorityq_deinit(&queue->user_queue); + hash_table_destroy(&queue->user_hash); + i_assert(array_count(&queue->sync_lookups) == 0); + array_free(&queue->sync_lookups); + i_free(queue); +} + +void replicator_queue_set_change_callback(struct replicator_queue *queue, + void (*callback)(void *context), + void *context) +{ + queue->change_callback = callback; + queue->change_context = context; +} + +static struct replicator_user * +replicator_queue_add_int(struct replicator_queue *queue, const char *username, + enum replication_priority priority) +{ + struct replicator_user *user; + + user = hash_table_lookup(queue->user_hash, username); + if (user == NULL) { + user = i_new(struct replicator_user, 1); + user->username = i_strdup(username); + } else { + if (user->priority > priority) { + /* user already has a higher priority than this */ + return user; + } + if (!user->popped) + priorityq_remove(queue->user_queue, &user->item); + } + user->priority = priority; + user->last_update = ioloop_time; + + if (!user->popped) + priorityq_add(queue->user_queue, &user->item); + return user; +} + +struct replicator_user * +replicator_queue_add(struct replicator_queue *queue, const char *username, + enum replication_priority priority) +{ + struct replicator_user *user; + + user = replicator_queue_add_int(queue, username, priority); + if (queue->change_callback != NULL) + queue->change_callback(queue->change_context); + return user; +} + +void replicator_queue_add_sync(struct replicator_queue *queue, + const char *username, + replicator_sync_callback_t *callback, + void *context) +{ + struct replicator_user *user; + struct replicator_sync_lookup *lookup; + + user = replicator_queue_add_int(queue, username, + REPLICATION_PRIORITY_SYNC); + + lookup = array_append_space(&queue->sync_lookups); + lookup->user = user; + lookup->callback = callback; + lookup->context = context; + lookup->wait_for_next_push = user->popped; + + if (queue->change_callback != NULL) + queue->change_callback(queue->change_context); +} + +void replicator_queue_remove(struct replicator_queue *queue, + struct replicator_user **_user) +{ + struct replicator_user *user = *_user; + + *_user = NULL; + if (!user->popped) + priorityq_remove(queue->user_queue, &user->item); + hash_table_remove(queue->user_hash, user->username); + + i_free(user->username); + i_free(user); + + if (queue->change_callback != NULL) + queue->change_callback(queue->change_context); +} + +struct replicator_user * +replicator_queue_pop(struct replicator_queue *queue, + unsigned int *next_secs_r) +{ + struct priorityq_item *item; + struct replicator_user *user; + + item = priorityq_peek(queue->user_queue); + if (item == NULL) { + /* no users defined. we shouldn't normally get here */ + *next_secs_r = 3600; + return NULL; + } + user = (struct replicator_user *)item; + + if (user->priority == REPLICATION_PRIORITY_NONE && + user->last_full_sync + queue->full_sync_interval > ioloop_time) { + /* we don't want to do a full sync yet */ + *next_secs_r = user->last_full_sync + + queue->full_sync_interval - ioloop_time; + return NULL; + } + priorityq_remove(queue->user_queue, &user->item); + user->popped = TRUE; + return user; +} + +static void +replicator_queue_handle_sync_lookups(struct replicator_queue *queue, + struct replicator_user *user) +{ + struct replicator_sync_lookup *lookups; + ARRAY_DEFINE(callbacks, struct replicator_sync_lookup); + unsigned int i, count; + bool success = !user->last_sync_failed; + + t_array_init(&callbacks, 8); + lookups = array_get_modifiable(&queue->sync_lookups, &count); + for (i = 0; i < count; ) { + if (lookups[i].user != user) + i++; + else if (lookups[i].wait_for_next_push) { + /* another sync request came while user was being + replicated */ + i_assert(user->priority == REPLICATION_PRIORITY_SYNC); + lookups[i].wait_for_next_push = FALSE; + i++; + } else { + array_append(&callbacks, &lookups[i], 1); + array_delete(&queue->sync_lookups, i, 1); + } + } + + array_foreach_modifiable(&callbacks, lookups) + lookups->callback(success, lookups->context); +} + +void replicator_queue_push(struct replicator_queue *queue, + struct replicator_user *user) +{ + priorityq_add(queue->user_queue, &user->item); + user->popped = FALSE; + + T_BEGIN { + replicator_queue_handle_sync_lookups(queue, user); + } T_END; +} + +static int +replicator_queue_import_line(struct replicator_queue *queue, const char *line) +{ + const char *const *args, *username; + unsigned int priority; + struct replicator_user *user, tmp_user; + + /* <user> <priority> <last update> <last fast sync> <last full sync> */ + args = t_strsplit_tabescaped(line); + if (str_array_length(args) < 5) + return -1; + + memset(&tmp_user, 0, sizeof(tmp_user)); + username = args[0]; + if (username[0] == '\0' || + str_to_uint(args[1], &priority) < 0 || + str_to_time(args[2], &tmp_user.last_update) < 0 || + str_to_time(args[3], &tmp_user.last_fast_sync) < 0 || + str_to_time(args[3], &tmp_user.last_full_sync) < 0) + return -1; + tmp_user.priority = priority; + + user = hash_table_lookup(queue->user_hash, username); + if (user != NULL) { + if (user->last_update > tmp_user.last_update) { + /* we already have a newer state */ + return 0; + } + if (user->last_update == tmp_user.last_update) { + /* either one of these could be newer. use the one + with higher priority. */ + if (user->priority > tmp_user.priority) + return 0; + } + } + user = replicator_queue_add(queue, tmp_user.username, + tmp_user.priority); + user->last_update = tmp_user.last_update; + user->last_fast_sync = tmp_user.last_fast_sync; + user->last_full_sync = tmp_user.last_full_sync; + return 0; +} + +int replicator_queue_import(struct replicator_queue *queue, const char *path) +{ + struct istream *input; + const char *line; + int fd, ret = 0; + + fd = open(path, O_RDONLY); + if (fd == -1) { + if (errno == ENOENT) + return 0; + i_error("open(%s) failed: %m", path); + return -1; + } + + input = i_stream_create_fd(fd, (size_t)-1, TRUE); + while ((line = i_stream_read_next_line(input)) != NULL) { + T_BEGIN { + ret = replicator_queue_import_line(queue, line); + } T_END; + if (ret < 0) { + i_error("Invalid replicator db record: %s", line); + break; + } + } + if (input->stream_errno != 0) + ret = -1; + i_stream_destroy(&input); + return ret; +} + +static void +replicator_queue_export_user(struct replicator_user *user, string_t *str) +{ + str_tabescape_write(str, user->username); + str_printfa(str, "\t%d\t%lld\t%lld\t%lld", (int)user->priority, + (long long)user->last_update, + (long long)user->last_fast_sync, + (long long)user->last_full_sync); +} + +int replicator_queue_export(struct replicator_queue *queue, const char *path) +{ + struct ostream *output; + struct priorityq_item *const *items; + unsigned int i, count; + string_t *str; + int fd, ret; + + fd = creat(path, 0600); + if (fd == -1) { + i_error("creat(%s) failed: %m", path); + return -1; + } + output = o_stream_create_fd_file(fd, 0, TRUE); + o_stream_cork(output); + + str = t_str_new(128); + items = priorityq_items(queue->user_queue); + count = priorityq_count(queue->user_queue); + for (i = 0; i < count; i++) { + struct replicator_user *user = + (struct replicator_user *)items[i]; + + str_truncate(str, 0); + replicator_queue_export_user(user, str); + if (o_stream_send(output, str_data(str), str_len(str)) < 0) + break; + } + + ret = output->last_failed_errno != 0 ? -1 : 0; + o_stream_destroy(&output); + return ret; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/replicator-queue.h Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,60 @@ +#ifndef REPLICATOR_QUEUE_H +#define REPLICATOR_QUEUE_H + +#include "priorityq.h" +#include "replication-common.h" + +struct replicator_user { + struct priorityq_item item; + + char *username; + enum replication_priority priority; + /* last time this user's state was updated */ + time_t last_update; + /* last_fast_run is always >= last_full_run. */ + time_t last_fast_sync, last_full_sync; + + /* User isn't currently in replication queue */ + unsigned int popped:1; + /* Last replication sync failed */ + unsigned int last_sync_failed:1; +}; + +typedef void replicator_sync_callback_t(bool success, void *context); + +struct replicator_queue *replicator_queue_init(unsigned int full_sync_interval); +void replicator_queue_deinit(struct replicator_queue **queue); + +/* Call the specified callback when data is added/removed/moved in queue + via _add(), _add_sync() or _remove() functions (not push/pop). */ +void replicator_queue_set_change_callback(struct replicator_queue *queue, + void (*callback)(void *context), + void *context); + +/* Add a user to queue and return it. If the user already exists, it's updated + only if the new priority is higher. */ +struct replicator_user * +replicator_queue_add(struct replicator_queue *queue, const char *username, + enum replication_priority priority); +void replicator_queue_add_sync(struct replicator_queue *queue, + const char *username, + replicator_sync_callback_t *callback, + void *context); +/* Remove user from replication queue and free it. */ +void replicator_queue_remove(struct replicator_queue *queue, + struct replicator_user **user); + +/* Return the next user from replication queue, and remove it from the queue. + If there's nothing to be replicated currently, returns NULL and sets + next_secs_r to when there should be more work to do. */ +struct replicator_user * +replicator_queue_pop(struct replicator_queue *queue, + unsigned int *next_secs_r); +/* Add user back to queue. */ +void replicator_queue_push(struct replicator_queue *queue, + struct replicator_user *user); + +int replicator_queue_import(struct replicator_queue *queue, const char *path); +int replicator_queue_export(struct replicator_queue *queue, const char *path); + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/replicator-settings.c Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,80 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "buffer.h" +#include "settings-parser.h" +#include "service-settings.h" +#include "replicator-settings.h" + +/* <settings checks> */ +static struct file_listener_settings replicator_unix_listeners_array[] = { + { "replicator", 0600, "$default_internal_user", "" } +}; +static struct file_listener_settings *replicator_unix_listeners[] = { + &replicator_unix_listeners_array[0] +}; +static buffer_t replicator_unix_listeners_buf = { + replicator_unix_listeners, sizeof(replicator_unix_listeners), { 0, } +}; +/* </settings checks> */ + +struct service_settings replicator_service_settings = { + .name = "replicator", + .protocol = "", + .type = "", + .executable = "replicator", + .user = "", + .group = "", + .privileged_group = "", + .extra_groups = "", + .chroot = "", + + .drop_priv_before_exec = FALSE, + + .process_min_avail = 0, + .process_limit = 1, + .client_limit = 0, + .service_count = 0, + .idle_kill = -1U, + .vsz_limit = (uoff_t)-1, + + .unix_listeners = { { &replicator_unix_listeners_buf, + sizeof(replicator_unix_listeners[0]) } }, + .fifo_listeners = ARRAY_INIT, + .inet_listeners = ARRAY_INIT +}; + +#undef DEF +#define DEF(type, name) \ + { type, #name, offsetof(struct replicator_settings, name), NULL } + +static const struct setting_define replicator_setting_defines[] = { + DEF(SET_STR, auth_socket_path), + DEF(SET_STR, doveadm_socket_path), + + DEF(SET_TIME, replication_full_sync_interval), + DEF(SET_TIME, replication_max_conns), + + SETTING_DEFINE_LIST_END +}; + +const struct replicator_settings replicator_default_settings = { + .auth_socket_path = "auth-userdb", + .doveadm_socket_path = "doveadm-server", + + .replication_full_sync_interval = 60*60*12, + .replication_max_conns = 10 +}; + +const struct setting_parser_info replicator_setting_parser_info = { + .module_name = "replicator", + .defines = replicator_setting_defines, + .defaults = &replicator_default_settings, + + .type_offset = (size_t)-1, + .struct_size = sizeof(struct replicator_settings), + + .parent_offset = (size_t)-1 +}; + +const struct replicator_settings *replicator_settings;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/replicator-settings.h Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,15 @@ +#ifndef REPLICATOR_SETTINGS_H +#define REPLICATOR_SETTINGS_H + +struct replicator_settings { + const char *auth_socket_path; + const char *doveadm_socket_path; + + unsigned int replication_full_sync_interval; + unsigned int replication_max_conns; +}; + +extern const struct setting_parser_info replicator_setting_parser_info; +extern const struct replicator_settings *replicator_settings; + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/replication/replicator/replicator.c Sun Mar 04 09:50:21 2012 +0200 @@ -0,0 +1,117 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "restrict-access.h" +#include "auth-master.h" +#include "master-service.h" +#include "master-service-settings.h" +#include "notify-connection.h" +#include "replicator-brain.h" +#include "replicator-queue.h" +#include "replicator-settings.h" + +#define REPLICATOR_AUTH_SERVICE_NAME "replicator" +#define REPLICATOR_DB_DUMP_INTERVAL_MSECS (1000*60*15) +#define REPLICATOR_DB_PATH PKG_STATEDIR"/replicator.db" + +static struct replicator_queue *queue; +static struct replicator_brain *brain; +static const struct replicator_settings *set; +static struct timeout *to_dump; + +static void client_connected(struct master_service_connection *conn) +{ + master_service_client_connection_accept(conn); + (void)notify_connection_create(conn->fd, queue); +} + +static void replication_add_users(struct replicator_queue *queue) +{ + struct auth_master_connection *auth_conn; + struct auth_master_user_list_ctx *ctx; + struct auth_user_info user_info; + struct replicator_user *user; + const char *username; + + auth_conn = auth_master_init(set->auth_socket_path, + AUTH_MASTER_FLAG_NO_IDLE_TIMEOUT); + + memset(&user_info, 0, sizeof(user_info)); + user_info.service = REPLICATOR_AUTH_SERVICE_NAME; + + /* add all users into replication queue, so that we can start doing + full syncs for everyone whose state can't be found */ + ctx = auth_master_user_list_init(auth_conn, NULL, &user_info); + while ((username = auth_master_user_list_next(ctx)) != NULL) { + user = replicator_queue_add(queue, username, + REPLICATION_PRIORITY_NONE); + user->last_update = 0; + } + if (auth_master_user_list_deinit(&ctx) < 0) + i_error("listing users failed, can't replicate existing data"); + auth_master_deinit(&auth_conn); + + /* add updates from replicator db, if it exists */ + (void)replicator_queue_import(queue, REPLICATOR_DB_PATH); +} + +static void replicator_dump_timeout(void *context ATTR_UNUSED) +{ + (void)replicator_queue_export(queue, REPLICATOR_DB_PATH); +} + +static void main_init(void) +{ + void **sets; + + sets = master_service_settings_get_others(master_service); + set = sets[0]; + + queue = replicator_queue_init(set->replication_full_sync_interval); + replication_add_users(queue); + to_dump = timeout_add(REPLICATOR_DB_DUMP_INTERVAL_MSECS, + replicator_dump_timeout, NULL); + brain = replicator_brain_init(queue, set); +} + +static void main_deinit(void) +{ + notify_connections_destroy_all(); + replicator_brain_deinit(&brain); + timeout_remove(&to_dump); + (void)replicator_queue_export(queue, REPLICATOR_DB_PATH); + replicator_queue_deinit(&queue); +} + +int main(int argc, char *argv[]) +{ + const struct setting_parser_info *set_roots[] = { + &replicator_setting_parser_info, + NULL + }; + const enum master_service_flags service_flags = + MASTER_SERVICE_FLAG_NO_IDLE_DIE; + const char *error; + + master_service = master_service_init("replicator", service_flags, + &argc, &argv, NULL); + if (master_getopt(master_service) > 0) + return FATAL_DEFAULT; + + if (master_service_settings_read_simple(master_service, set_roots, + &error) < 0) + i_fatal("Error reading configuration: %s", error); + master_service_init_log(master_service, "replicator: "); + + restrict_access_by_env(NULL, FALSE); + restrict_access_allow_coredumps(TRUE); + master_service_init_finish(master_service); + + main_init(); + master_service_run(master_service, client_connected); + main_deinit(); + + master_service_deinit(&master_service); + return 0; +}