Mercurial > dovecot > core-2.2
changeset 15037:920756cd29b8
dsync: Renamed "slave" to "ibc" (= inter-brain communicator)
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Fri, 07 Sep 2012 16:43:44 +0300 |
parents | 5943cace4e05 |
children | 92cc78c5dc77 |
files | src/doveadm/dsync/Makefile.am src/doveadm/dsync/doveadm-dsync.c src/doveadm/dsync/dsync-brain-mailbox-tree.c src/doveadm/dsync/dsync-brain-mailbox.c src/doveadm/dsync/dsync-brain-mails.c src/doveadm/dsync/dsync-brain-private.h src/doveadm/dsync/dsync-brain.c src/doveadm/dsync/dsync-brain.h src/doveadm/dsync/dsync-ibc-pipe.c src/doveadm/dsync/dsync-ibc-private.h src/doveadm/dsync/dsync-ibc-stream.c src/doveadm/dsync/dsync-ibc.c src/doveadm/dsync/dsync-ibc.h src/doveadm/dsync/dsync-slave-pipe.c src/doveadm/dsync/dsync-slave-private.h src/doveadm/dsync/dsync-slave-stream.c src/doveadm/dsync/dsync-slave.c src/doveadm/dsync/dsync-slave.h |
diffstat | 18 files changed, 2502 insertions(+), 2512 deletions(-) [+] |
line wrap: on
line diff
--- a/src/doveadm/dsync/Makefile.am Fri Sep 07 16:19:35 2012 +0300 +++ b/src/doveadm/dsync/Makefile.am Fri Sep 07 16:43:44 2012 +0300 @@ -27,9 +27,9 @@ dsync-mailbox-tree-fill.c \ dsync-mailbox-tree-sync.c \ dsync-serializer.c \ - dsync-slave.c \ - dsync-slave-stream.c \ - dsync-slave-pipe.c \ + dsync-ibc.c \ + dsync-ibc-stream.c \ + dsync-ibc-pipe.c \ dsync-transaction-log-scan.c noinst_HEADERS = \ @@ -46,8 +46,8 @@ dsync-mailbox-tree-private.h \ dsync-serializer.h \ dsync-deserializer.h \ - dsync-slave.h \ - dsync-slave-private.h \ + dsync-ibc.h \ + dsync-ibc-private.h \ dsync-transaction-log-scan.h test_programs = \
--- a/src/doveadm/dsync/doveadm-dsync.c Fri Sep 07 16:19:35 2012 +0300 +++ b/src/doveadm/dsync/doveadm-dsync.c Fri Sep 07 16:43:44 2012 +0300 @@ -15,7 +15,7 @@ #include "doveadm-settings.h" #include "doveadm-mail.h" #include "dsync-brain.h" -#include "dsync-slave.h" +#include "dsync-ibc.h" #include "doveadm-dsync.h" #include <stdio.h> @@ -228,7 +228,7 @@ static int cmd_dsync_run_local(struct dsync_cmd_context *ctx, struct mail_user *user, - struct dsync_brain *brain, struct dsync_slave *slave2) + struct dsync_brain *brain, struct dsync_ibc *ibc2) { struct dsync_brain *brain2; struct mail_user *user2; @@ -267,7 +267,7 @@ "points to same directory: %s", path1); } - brain2 = dsync_brain_slave_init(user2, slave2); + brain2 = dsync_brain_slave_init(user2, ibc2); brain1_running = brain2_running = TRUE; changed1 = changed2 = TRUE; @@ -312,7 +312,7 @@ cmd_dsync_run(struct doveadm_mail_cmd_context *_ctx, struct mail_user *user) { struct dsync_cmd_context *ctx = (struct dsync_cmd_context *)_ctx; - struct dsync_slave *slave, *slave2 = NULL; + struct dsync_ibc *ibc, *ibc2 = NULL; struct dsync_brain *brain; struct mail_namespace *sync_ns = NULL; int ret = 0; @@ -330,26 +330,26 @@ } if (!ctx->remote) - dsync_slave_init_pipe(&slave, &slave2); + dsync_ibc_init_pipe(&ibc, &ibc2); else { string_t *temp_prefix = t_str_new(64); mail_user_set_get_temp_prefix(temp_prefix, user->set); - slave = dsync_slave_init_stream(ctx->fd_in, ctx->fd_out, - ctx->remote_name, - str_c(temp_prefix)); + ibc = dsync_ibc_init_stream(ctx->fd_in, ctx->fd_out, + ctx->remote_name, + str_c(temp_prefix)); } if (doveadm_debug || doveadm_verbose) { // FIXME } - brain = dsync_brain_master_init(user, slave, sync_ns, + brain = dsync_brain_master_init(user, ibc, sync_ns, ctx->sync_type, DSYNC_BRAIN_FLAG_MAILS_HAVE_GUIDS | DSYNC_BRAIN_FLAG_SEND_REQUESTS, ""); if (!ctx->remote) { - if (cmd_dsync_run_local(ctx, user, brain, slave2) < 0) + if (cmd_dsync_run_local(ctx, user, brain, ibc2) < 0) _ctx->exit_code = EX_TEMPFAIL; } else { cmd_dsync_run_remote(user); @@ -357,9 +357,9 @@ if (dsync_brain_deinit(&brain) < 0) _ctx->exit_code = EX_TEMPFAIL; - dsync_slave_deinit(&slave); - if (slave2 != NULL) - dsync_slave_deinit(&slave2); + dsync_ibc_deinit(&ibc); + if (ibc2 != NULL) + dsync_ibc_deinit(&ibc2); if (ctx->io_err != NULL) io_remove(&ctx->io_err); if (ctx->fd_err != -1) @@ -507,7 +507,7 @@ cmd_dsync_server_run(struct doveadm_mail_cmd_context *_ctx ATTR_UNUSED, struct mail_user *user) { - struct dsync_slave *slave; + struct dsync_ibc *ibc; struct dsync_brain *brain; string_t *temp_prefix; @@ -520,13 +520,13 @@ temp_prefix = t_str_new(64); mail_user_set_get_temp_prefix(temp_prefix, user->set); - slave = dsync_slave_init_stream(STDIN_FILENO, STDOUT_FILENO, - "local", str_c(temp_prefix)); - brain = dsync_brain_slave_init(user, slave); + ibc = dsync_ibc_init_stream(STDIN_FILENO, STDOUT_FILENO, + "local", str_c(temp_prefix)); + brain = dsync_brain_slave_init(user, ibc); io_loop_run(current_ioloop); - dsync_slave_deinit(&slave); + dsync_ibc_deinit(&ibc); return dsync_brain_deinit(&brain); }
--- a/src/doveadm/dsync/dsync-brain-mailbox-tree.c Fri Sep 07 16:19:35 2012 +0300 +++ b/src/doveadm/dsync/dsync-brain-mailbox-tree.c Fri Sep 07 16:43:44 2012 +0300 @@ -5,7 +5,7 @@ #include "settings-parser.h" #include "mail-namespace.h" #include "doveadm-settings.h" -#include "dsync-slave.h" +#include "dsync-ibc.h" #include "dsync-mailbox-tree.h" #include "dsync-brain-private.h" @@ -94,7 +94,7 @@ void dsync_brain_send_mailbox_tree(struct dsync_brain *brain) { struct dsync_mailbox_node *node; - enum dsync_slave_send_ret ret; + enum dsync_ibc_send_ret ret; const char *full_name; char sep[2]; @@ -105,14 +105,14 @@ const char *const *parts; parts = t_strsplit(full_name, sep); - ret = dsync_slave_send_mailbox_tree_node(brain->slave, - parts, node); + ret = dsync_ibc_send_mailbox_tree_node(brain->ibc, + parts, node); } T_END; - if (ret == DSYNC_SLAVE_SEND_RET_FULL) + if (ret == DSYNC_IBC_SEND_RET_FULL) return; } dsync_mailbox_tree_iter_deinit(&brain->local_tree_iter); - dsync_slave_send_end_of_list(brain->slave); + dsync_ibc_send_end_of_list(brain->ibc); brain->state = DSYNC_STATE_SEND_MAILBOX_TREE_DELETES; } @@ -124,8 +124,8 @@ deletes = dsync_mailbox_tree_get_deletes(brain->local_mailbox_tree, &count); - dsync_slave_send_mailbox_deletes(brain->slave, deletes, count, - brain->hierarchy_sep); + dsync_ibc_send_mailbox_deletes(brain->ibc, deletes, count, + brain->hierarchy_sep); brain->state = DSYNC_STATE_RECV_MAILBOX_TREE; } @@ -289,12 +289,12 @@ struct dsync_mailbox_node *node; const char *const *parts, *name; struct mail_namespace *ns; - enum dsync_slave_recv_ret ret; + enum dsync_ibc_recv_ret ret; char sep[2]; bool changed = FALSE; - while ((ret = dsync_slave_recv_mailbox_tree_node(brain->slave, &parts, - &remote_node)) > 0) { + while ((ret = dsync_ibc_recv_mailbox_tree_node(brain->ibc, &parts, + &remote_node)) > 0) { if (dsync_get_mailbox_name(brain, parts, &name, &ns) < 0) { sep[0] = brain->hierarchy_sep; sep[1] = '\0'; i_error("Couldn't find namespace for mailbox %s", @@ -306,7 +306,7 @@ node->ns = ns; dsync_mailbox_node_copy_data(node, remote_node); } - if (ret == DSYNC_SLAVE_RECV_RET_FINISHED) { + if (ret == DSYNC_IBC_RECV_RET_FINISHED) { if (dsync_mailbox_tree_build_guid_hash(brain->remote_mailbox_tree) < 0) brain->failed = TRUE; @@ -369,8 +369,8 @@ unsigned int i, count; char sep; - if (dsync_slave_recv_mailbox_deletes(brain->slave, &deletes, &count, - &sep) == 0) + if (dsync_ibc_recv_mailbox_deletes(brain->ibc, &deletes, &count, + &sep) == 0) return FALSE; /* apply remote's mailbox deletions based on our local tree */
--- a/src/doveadm/dsync/dsync-brain-mailbox.c Fri Sep 07 16:19:35 2012 +0300 +++ b/src/doveadm/dsync/dsync-brain-mailbox.c Fri Sep 07 16:43:44 2012 +0300 @@ -6,7 +6,7 @@ #include "mail-cache-private.h" #include "mail-namespace.h" #include "mail-storage-private.h" -#include "dsync-slave.h" +#include "dsync-ibc.h" #include "dsync-mailbox-tree.h" #include "dsync-mailbox-import.h" #include "dsync-mailbox-export.h" @@ -370,12 +370,12 @@ if (!dsync_brain_next_mailbox(brain, &box, &dsync_box)) { brain->state = DSYNC_STATE_DONE; - dsync_slave_send_end_of_list(brain->slave); + dsync_ibc_send_end_of_list(brain->ibc); return; } /* start exporting this mailbox (wait for remote to start importing) */ - dsync_slave_send_mailbox(brain->slave, &dsync_box); + dsync_ibc_send_mailbox(brain->ibc, &dsync_box); (void)dsync_brain_sync_mailbox_init(brain, box, &dsync_box, DSYNC_BOX_STATE_MAILBOX); brain->state = DSYNC_STATE_SYNC_MAILS; @@ -526,7 +526,7 @@ i_assert(!brain->master_brain); i_assert(brain->box == NULL); - if ((ret = dsync_slave_recv_mailbox(brain->slave, &dsync_box)) == 0) + if ((ret = dsync_ibc_recv_mailbox(brain->ibc, &dsync_box)) == 0) return FALSE; if (ret < 0) { brain->state = DSYNC_STATE_DONE; @@ -563,13 +563,13 @@ memcpy(delete_box.mailbox_guid, dsync_box->mailbox_guid, sizeof(delete_box.mailbox_guid)); delete_box.mailbox_lost = TRUE; - dsync_slave_send_mailbox(brain->slave, &delete_box); + dsync_ibc_send_mailbox(brain->ibc, &delete_box); return TRUE; } i_assert(local_dsync_box.uid_validity != 0); i_assert(memcmp(dsync_box->mailbox_guid, local_dsync_box.mailbox_guid, sizeof(dsync_box->mailbox_guid)) == 0); - dsync_slave_send_mailbox(brain->slave, &local_dsync_box); + dsync_ibc_send_mailbox(brain->ibc, &local_dsync_box); dsync_brain_mailbox_update_pre(brain, box, &local_dsync_box, dsync_box);
--- a/src/doveadm/dsync/dsync-brain-mails.c Fri Sep 07 16:19:35 2012 +0300 +++ b/src/doveadm/dsync/dsync-brain-mails.c Fri Sep 07 16:43:44 2012 +0300 @@ -2,7 +2,7 @@ #include "lib.h" #include "istream.h" -#include "dsync-slave.h" +#include "dsync-ibc.h" #include "dsync-mail.h" #include "dsync-mailbox-import.h" #include "dsync-mailbox-export.h" @@ -11,13 +11,13 @@ static bool dsync_brain_master_sync_recv_mailbox(struct dsync_brain *brain) { const struct dsync_mailbox *dsync_box; - enum dsync_slave_recv_ret ret; + enum dsync_ibc_recv_ret ret; i_assert(brain->master_brain); - if ((ret = dsync_slave_recv_mailbox(brain->slave, &dsync_box)) == 0) + if ((ret = dsync_ibc_recv_mailbox(brain->ibc, &dsync_box)) == 0) return FALSE; - if (ret == DSYNC_SLAVE_RECV_RET_FINISHED) { + if (ret == DSYNC_IBC_RECV_RET_FINISHED) { i_error("Remote sent end-of-list instead of a mailbox"); brain->failed = TRUE; return TRUE; @@ -55,11 +55,11 @@ static bool dsync_brain_recv_mail_change(struct dsync_brain *brain) { const struct dsync_mail_change *change; - enum dsync_slave_recv_ret ret; + enum dsync_ibc_recv_ret ret; - if ((ret = dsync_slave_recv_change(brain->slave, &change)) == 0) + if ((ret = dsync_ibc_recv_change(brain->ibc, &change)) == 0) return FALSE; - if (ret == DSYNC_SLAVE_RECV_RET_FINISHED) { + if (ret == DSYNC_IBC_RECV_RET_FINISHED) { dsync_mailbox_import_changes_finish(brain->box_importer); brain->box_recv_state = brain->guid_requests ? DSYNC_BOX_STATE_MAIL_REQUESTS : DSYNC_BOX_STATE_MAILS; @@ -74,10 +74,10 @@ const struct dsync_mail_change *change; while ((change = dsync_mailbox_export_next(brain->box_exporter)) != NULL) { - if (dsync_slave_send_change(brain->slave, change) == 0) + if (dsync_ibc_send_change(brain->ibc, change) == 0) return; } - dsync_slave_send_end_of_list(brain->slave); + dsync_ibc_send_end_of_list(brain->ibc); brain->box_send_state = brain->guid_requests ? DSYNC_BOX_STATE_MAIL_REQUESTS : DSYNC_BOX_STATE_MAILS; } @@ -85,13 +85,13 @@ static bool dsync_brain_recv_mail_request(struct dsync_brain *brain) { const struct dsync_mail_request *request; - enum dsync_slave_recv_ret ret; + enum dsync_ibc_recv_ret ret; i_assert(brain->guid_requests); - if ((ret = dsync_slave_recv_mail_request(brain->slave, &request)) == 0) + if ((ret = dsync_ibc_recv_mail_request(brain->ibc, &request)) == 0) return FALSE; - if (ret == DSYNC_SLAVE_RECV_RET_FINISHED) { + if (ret == DSYNC_IBC_RECV_RET_FINISHED) { brain->box_recv_state = DSYNC_BOX_STATE_MAILS; return TRUE; } @@ -106,11 +106,11 @@ i_assert(brain->guid_requests); while ((request = dsync_mailbox_import_next_request(brain->box_importer)) != NULL) { - if (dsync_slave_send_mail_request(brain->slave, request) == 0) + if (dsync_ibc_send_mail_request(brain->ibc, request) == 0) return; } if (brain->box_recv_state > DSYNC_BOX_STATE_CHANGES) { - dsync_slave_send_end_of_list(brain->slave); + dsync_ibc_send_end_of_list(brain->ibc); brain->box_send_state = DSYNC_BOX_STATE_MAILS; } } @@ -155,17 +155,17 @@ if (changes_during_sync) brain->changes_during_sync = TRUE; } - dsync_slave_send_mailbox_state(brain->slave, &state); + dsync_ibc_send_mailbox_state(brain->ibc, &state); } static bool dsync_brain_recv_mail(struct dsync_brain *brain) { struct dsync_mail *mail; - enum dsync_slave_recv_ret ret; + enum dsync_ibc_recv_ret ret; - if ((ret = dsync_slave_recv_mail(brain->slave, &mail)) == 0) + if ((ret = dsync_ibc_recv_mail(brain->ibc, &mail)) == 0) return FALSE; - if (ret == DSYNC_SLAVE_RECV_RET_FINISHED) { + if (ret == DSYNC_IBC_RECV_RET_FINISHED) { brain->box_recv_state = DSYNC_BOX_STATE_RECV_LAST_COMMON; dsync_brain_sync_half_finished(brain); return TRUE; @@ -183,7 +183,7 @@ while ((mail = dsync_mailbox_export_next_mail(brain->box_exporter)) != NULL) { changed = TRUE; - if (dsync_slave_send_mail(brain->slave, mail) == 0) + if (dsync_ibc_send_mail(brain->ibc, mail) == 0) return TRUE; } if (brain->guid_requests && @@ -193,7 +193,7 @@ } brain->box_send_state = DSYNC_BOX_STATE_DONE; - dsync_slave_send_end_of_list(brain->slave); + dsync_ibc_send_end_of_list(brain->ibc); dsync_brain_sync_half_finished(brain); return TRUE; @@ -201,12 +201,12 @@ static bool dsync_brain_recv_last_common(struct dsync_brain *brain) { - enum dsync_slave_recv_ret ret; + enum dsync_ibc_recv_ret ret; struct dsync_mailbox_state state; - if ((ret = dsync_slave_recv_mailbox_state(brain->slave, &state)) == 0) + if ((ret = dsync_ibc_recv_mailbox_state(brain->ibc, &state)) == 0) return FALSE; - if (ret == DSYNC_SLAVE_RECV_RET_FINISHED) { + if (ret == DSYNC_IBC_RECV_RET_FINISHED) { i_error("Remote sent end-of-list instead of a mailbox state"); brain->failed = TRUE; return TRUE; @@ -260,7 +260,7 @@ changed = TRUE; break; case DSYNC_BOX_STATE_MAILS: - if (!dsync_slave_is_send_queue_full(brain->slave)) { + if (!dsync_ibc_is_send_queue_full(brain->ibc)) { if (dsync_brain_send_mail(brain)) changed = TRUE; }
--- a/src/doveadm/dsync/dsync-brain-private.h Fri Sep 07 16:19:35 2012 +0300 +++ b/src/doveadm/dsync/dsync-brain-private.h Fri Sep 07 16:43:44 2012 +0300 @@ -34,7 +34,7 @@ struct dsync_brain { pool_t pool; struct mail_user *user; - struct dsync_slave *slave; + struct dsync_ibc *ibc; struct mail_namespace *sync_ns; enum dsync_brain_sync_type sync_type;
--- a/src/doveadm/dsync/dsync-brain.c Fri Sep 07 16:19:35 2012 +0300 +++ b/src/doveadm/dsync/dsync-brain.c Fri Sep 07 16:43:44 2012 +0300 @@ -5,7 +5,7 @@ #include "hash.h" #include "mail-namespace.h" #include "dsync-mailbox-tree.h" -#include "dsync-slave.h" +#include "dsync-ibc.h" #include "dsync-brain-private.h" static void dsync_brain_run_io(void *context) @@ -13,7 +13,7 @@ struct dsync_brain *brain = context; bool changed, try_pending; - if (dsync_slave_has_failed(brain->slave)) { + if (dsync_ibc_has_failed(brain->ibc)) { io_loop_stop(current_ioloop); brain->failed = TRUE; return; @@ -28,7 +28,7 @@ if (changed) try_pending = TRUE; else if (try_pending) { - if (dsync_slave_has_pending_data(brain->slave)) + if (dsync_ibc_has_pending_data(brain->ibc)) changed = TRUE; try_pending = FALSE; } @@ -36,7 +36,7 @@ } static struct dsync_brain * -dsync_brain_common_init(struct mail_user *user, struct dsync_slave *slave) +dsync_brain_common_init(struct mail_user *user, struct dsync_ibc *ibc) { struct dsync_brain *brain; pool_t pool; @@ -45,7 +45,7 @@ brain = p_new(pool, struct dsync_brain, 1); brain->pool = pool; brain->user = user; - brain->slave = slave; + brain->ibc = ibc; brain->sync_type = DSYNC_BRAIN_SYNC_TYPE_UNKNOWN; hash_table_create(&brain->remote_mailbox_states, brain->pool, 0, guid_128_hash, guid_128_cmp); @@ -54,20 +54,20 @@ } struct dsync_brain * -dsync_brain_master_init(struct mail_user *user, struct dsync_slave *slave, +dsync_brain_master_init(struct mail_user *user, struct dsync_ibc *ibc, struct mail_namespace *sync_ns, enum dsync_brain_sync_type sync_type, enum dsync_brain_flags flags, const char *state) { - struct dsync_slave_settings slave_set; + struct dsync_ibc_settings ibc_set; struct dsync_brain *brain; const char *error; i_assert(sync_type != DSYNC_BRAIN_SYNC_TYPE_UNKNOWN); i_assert(sync_type != DSYNC_BRAIN_SYNC_TYPE_STATE || *state != '\0'); - brain = dsync_brain_common_init(user, slave); + brain = dsync_brain_common_init(user, ibc); brain->sync_type = sync_type; if (sync_ns != NULL) brain->sync_ns = sync_ns; @@ -91,26 +91,26 @@ } dsync_brain_mailbox_trees_init(brain); - memset(&slave_set, 0, sizeof(slave_set)); - slave_set.sync_ns_prefix = sync_ns == NULL ? NULL : sync_ns->prefix; - slave_set.sync_type = sync_type; - slave_set.guid_requests = brain->guid_requests; - slave_set.mails_have_guids = brain->mails_have_guids; - dsync_slave_send_handshake(slave, &slave_set); + memset(&ibc_set, 0, sizeof(ibc_set)); + ibc_set.sync_ns_prefix = sync_ns == NULL ? NULL : sync_ns->prefix; + ibc_set.sync_type = sync_type; + ibc_set.guid_requests = brain->guid_requests; + ibc_set.mails_have_guids = brain->mails_have_guids; + dsync_ibc_send_handshake(ibc, &ibc_set); - dsync_slave_set_io_callback(slave, dsync_brain_run_io, brain); + dsync_ibc_set_io_callback(ibc, dsync_brain_run_io, brain); return brain; } struct dsync_brain * -dsync_brain_slave_init(struct mail_user *user, struct dsync_slave *slave) +dsync_brain_slave_init(struct mail_user *user, struct dsync_ibc *ibc) { struct dsync_brain *brain; - brain = dsync_brain_common_init(user, slave); + brain = dsync_brain_common_init(user, ibc); brain->state = DSYNC_STATE_SLAVE_RECV_HANDSHAKE; - dsync_slave_set_io_callback(slave, dsync_brain_run_io, brain); + dsync_ibc_set_io_callback(ibc, dsync_brain_run_io, brain); return brain; } @@ -121,7 +121,7 @@ *_brain = NULL; - if (dsync_slave_has_failed(brain->slave) || + if (dsync_ibc_has_failed(brain->ibc) || brain->state != DSYNC_STATE_DONE) brain->failed = TRUE; @@ -139,27 +139,27 @@ static bool dsync_brain_slave_recv_handshake(struct dsync_brain *brain) { - const struct dsync_slave_settings *slave_set; + const struct dsync_ibc_settings *ibc_set; i_assert(!brain->master_brain); - if (dsync_slave_recv_handshake(brain->slave, &slave_set) == 0) + if (dsync_ibc_recv_handshake(brain->ibc, &ibc_set) == 0) return FALSE; - if (slave_set->sync_ns_prefix != NULL) { + if (ibc_set->sync_ns_prefix != NULL) { brain->sync_ns = mail_namespace_find(brain->user->namespaces, - slave_set->sync_ns_prefix); + ibc_set->sync_ns_prefix); if (brain->sync_ns == NULL) { i_error("Requested sync namespace prefix=%s doesn't exist", - slave_set->sync_ns_prefix); + ibc_set->sync_ns_prefix); brain->failed = TRUE; return TRUE; } } i_assert(brain->sync_type == DSYNC_BRAIN_SYNC_TYPE_UNKNOWN); - brain->sync_type = slave_set->sync_type; - brain->guid_requests = slave_set->guid_requests; - brain->mails_have_guids = slave_set->mails_have_guids; + brain->sync_type = ibc_set->sync_type; + brain->guid_requests = ibc_set->guid_requests; + brain->mails_have_guids = ibc_set->mails_have_guids; dsync_brain_mailbox_trees_init(brain); @@ -174,18 +174,18 @@ { const struct dsync_mailbox_state *states; unsigned int count; - enum dsync_slave_send_ret ret = DSYNC_SLAVE_SEND_RET_OK; + enum dsync_ibc_send_ret ret = DSYNC_IBC_SEND_RET_OK; i_assert(brain->master_brain); states = array_get(&brain->mailbox_states, &count); while (brain->mailbox_state_idx < count) { - if (ret == DSYNC_SLAVE_SEND_RET_FULL) + if (ret == DSYNC_IBC_SEND_RET_FULL) return; - ret = dsync_slave_send_mailbox_state(brain->slave, + ret = dsync_ibc_send_mailbox_state(brain->ibc, &states[brain->mailbox_state_idx++]); } - dsync_slave_send_end_of_list(brain->slave); + dsync_ibc_send_end_of_list(brain->ibc); brain->state = DSYNC_STATE_SEND_MAILBOX_TREE; brain->mailbox_state_idx = 0; } @@ -193,16 +193,16 @@ static bool dsync_brain_slave_recv_last_common(struct dsync_brain *brain) { struct dsync_mailbox_state state; - enum dsync_slave_recv_ret ret; + enum dsync_ibc_recv_ret ret; bool changed = FALSE; i_assert(!brain->master_brain); - while ((ret = dsync_slave_recv_mailbox_state(brain->slave, &state)) > 0) { + while ((ret = dsync_ibc_recv_mailbox_state(brain->ibc, &state)) > 0) { array_append(&brain->mailbox_states, &state, 1); changed = TRUE; } - if (ret == DSYNC_SLAVE_RECV_RET_FINISHED) { + if (ret == DSYNC_IBC_RECV_RET_FINISHED) { brain->state = DSYNC_STATE_SEND_MAILBOX_TREE; changed = TRUE; } @@ -267,7 +267,7 @@ *changed_r = FALSE; - if (dsync_slave_has_failed(brain->slave)) { + if (dsync_ibc_has_failed(brain->ibc)) { brain->failed = TRUE; return FALSE; } @@ -276,7 +276,7 @@ ret = dsync_brain_run_real(brain, changed_r); } T_END; if (!brain->failed) - dsync_slave_flush(brain->slave); + dsync_ibc_flush(brain->ibc); return ret; }
--- a/src/doveadm/dsync/dsync-brain.h Fri Sep 07 16:19:35 2012 +0300 +++ b/src/doveadm/dsync/dsync-brain.h Fri Sep 07 16:43:44 2012 +0300 @@ -3,7 +3,7 @@ struct mail_namespace; struct mail_user; -struct dsync_slave; +struct dsync_ibc; enum dsync_brain_flags { DSYNC_BRAIN_FLAG_MAILS_HAVE_GUIDS = 0x01, @@ -23,13 +23,13 @@ }; struct dsync_brain * -dsync_brain_master_init(struct mail_user *user, struct dsync_slave *slave, +dsync_brain_master_init(struct mail_user *user, struct dsync_ibc *ibc, struct mail_namespace *sync_ns, enum dsync_brain_sync_type sync_type, enum dsync_brain_flags flags, const char *state); struct dsync_brain * -dsync_brain_slave_init(struct mail_user *user, struct dsync_slave *slave); +dsync_brain_slave_init(struct mail_user *user, struct dsync_ibc *ibc); /* Returns 0 if everything was successful, -1 if syncing failed in some way */ int dsync_brain_deinit(struct dsync_brain **brain);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/doveadm/dsync/dsync-ibc-pipe.c Fri Sep 07 16:43:44 2012 +0300 @@ -0,0 +1,482 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "istream.h" +#include "dsync-mail.h" +#include "dsync-mailbox.h" +#include "dsync-mailbox-state.h" +#include "dsync-mailbox-tree.h" +#include "dsync-ibc-private.h" + +enum item_type { + ITEM_END_OF_LIST, + ITEM_HANDSHAKE, + ITEM_MAILBOX_STATE, + ITEM_MAILBOX_TREE_NODE, + ITEM_MAILBOX_DELETE, + ITEM_MAILBOX, + ITEM_MAIL_CHANGE, + ITEM_MAIL_REQUEST, + ITEM_MAIL +}; + +struct item { + enum item_type type; + pool_t pool; + + union { + struct dsync_ibc_settings set; + struct dsync_mailbox_state state; + struct dsync_mailbox_node node; + guid_128_t mailbox_guid; + struct dsync_mailbox dsync_box; + struct dsync_mail_change change; + struct dsync_mail_request request; + struct dsync_mail mail; + struct { + const struct dsync_mailbox_delete *deletes; + unsigned int count; + char hierarchy_sep; + } mailbox_delete; + } u; +}; + +struct dsync_ibc_pipe { + struct dsync_ibc ibc; + + ARRAY(pool_t) pools; + ARRAY(struct item) item_queue; + struct dsync_ibc_pipe *remote; + + pool_t pop_pool; + struct item pop_item; +}; + +static pool_t dsync_ibc_pipe_get_pool(struct dsync_ibc_pipe *pipe) +{ + pool_t *pools, ret; + unsigned int count; + + pools = array_get_modifiable(&pipe->pools, &count); + if (count == 0) + return pool_alloconly_create("pipe item pool", 128); + + ret = pools[count-1]; + array_delete(&pipe->pools, count-1, 1); + p_clear(ret); + return ret; +} + +static struct item * ATTR_NOWARN_UNUSED_RESULT +dsync_ibc_pipe_push_item(struct dsync_ibc_pipe *pipe, enum item_type type) +{ + struct item *item; + + item = array_append_space(&pipe->item_queue); + item->type = type; + + switch (type) { + case ITEM_END_OF_LIST: + case ITEM_MAILBOX_STATE: + case ITEM_MAILBOX_DELETE: + break; + case ITEM_HANDSHAKE: + case ITEM_MAILBOX: + case ITEM_MAILBOX_TREE_NODE: + case ITEM_MAIL_CHANGE: + case ITEM_MAIL_REQUEST: + case ITEM_MAIL: + item->pool = dsync_ibc_pipe_get_pool(pipe); + break; + } + return item; +} + +static struct item * +dsync_ibc_pipe_pop_item(struct dsync_ibc_pipe *pipe, enum item_type type) +{ + struct item *item; + + if (array_count(&pipe->item_queue) == 0) + return NULL; + + item = array_idx_modifiable(&pipe->item_queue, 0); + i_assert(item->type == type); + pipe->pop_item = *item; + array_delete(&pipe->item_queue, 0, 1); + item = NULL; + + if (pipe->pop_pool != NULL) + pool_unref(&pipe->pop_pool); + pipe->pop_pool = pipe->pop_item.pool; + return &pipe->pop_item; +} + +static bool dsync_ibc_pipe_try_pop_eol(struct dsync_ibc_pipe *pipe) +{ + const struct item *item; + + if (array_count(&pipe->item_queue) == 0) + return FALSE; + + item = array_idx(&pipe->item_queue, 0); + if (item->type != ITEM_END_OF_LIST) + return FALSE; + + array_delete(&pipe->item_queue, 0, 1); + return TRUE; +} + +static void dsync_ibc_pipe_deinit(struct dsync_ibc *ibc) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + pool_t *poolp; + + if (pipe->remote != NULL) { + i_assert(pipe->remote->remote == pipe); + pipe->remote->remote = NULL; + } + + if (pipe->pop_pool != NULL) + pool_unref(&pipe->pop_pool); + array_foreach_modifiable(&pipe->pools, poolp) + pool_unref(poolp); + array_free(&pipe->pools); + array_free(&pipe->item_queue); + i_free(pipe); +} + +static void +dsync_ibc_pipe_send_handshake(struct dsync_ibc *ibc, + const struct dsync_ibc_settings *set) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + item = dsync_ibc_pipe_push_item(pipe->remote, ITEM_HANDSHAKE); + item->u.set = *set; + item->u.set.sync_ns_prefix = p_strdup(item->pool, set->sync_ns_prefix); +} + +static enum dsync_ibc_recv_ret +dsync_ibc_pipe_recv_handshake(struct dsync_ibc *ibc, + const struct dsync_ibc_settings **set_r) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + item = dsync_ibc_pipe_pop_item(pipe, ITEM_HANDSHAKE); + if (item == NULL) + return DSYNC_IBC_RECV_RET_TRYAGAIN; + + *set_r = &item->u.set; + return DSYNC_IBC_RECV_RET_OK; +} + +static bool dsync_ibc_pipe_is_send_queue_full(struct dsync_ibc *ibc) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + + return array_count(&pipe->remote->item_queue) > 0; +} + +static bool dsync_ibc_pipe_has_pending_data(struct dsync_ibc *ibc) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + + return array_count(&pipe->item_queue) > 0; +} + +static void +dsync_ibc_pipe_send_end_of_list(struct dsync_ibc *ibc) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + + dsync_ibc_pipe_push_item(pipe->remote, ITEM_END_OF_LIST); +} + +static void +dsync_ibc_pipe_send_mailbox_state(struct dsync_ibc *ibc, + const struct dsync_mailbox_state *state) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + item = dsync_ibc_pipe_push_item(pipe->remote, ITEM_MAILBOX_STATE); + item->u.state = *state; +} + +static enum dsync_ibc_recv_ret +dsync_ibc_pipe_recv_mailbox_state(struct dsync_ibc *ibc, + struct dsync_mailbox_state *state_r) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + if (dsync_ibc_pipe_try_pop_eol(pipe)) + return DSYNC_IBC_RECV_RET_FINISHED; + + item = dsync_ibc_pipe_pop_item(pipe, ITEM_MAILBOX_STATE); + if (item == NULL) + return DSYNC_IBC_RECV_RET_TRYAGAIN; + + *state_r = item->u.state; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_pipe_send_mailbox_tree_node(struct dsync_ibc *ibc, + const char *const *name, + const struct dsync_mailbox_node *node) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + item = dsync_ibc_pipe_push_item(pipe->remote, ITEM_MAILBOX_TREE_NODE); + + /* a little bit kludgy way to send it */ + item->u.node.name = (void *)p_strarray_dup(item->pool, name); + dsync_mailbox_node_copy_data(&item->u.node, node); +} + +static enum dsync_ibc_recv_ret +dsync_ibc_pipe_recv_mailbox_tree_node(struct dsync_ibc *ibc, + const char *const **name_r, + const struct dsync_mailbox_node **node_r) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + if (dsync_ibc_pipe_try_pop_eol(pipe)) + return DSYNC_IBC_RECV_RET_FINISHED; + + item = dsync_ibc_pipe_pop_item(pipe, ITEM_MAILBOX_TREE_NODE); + if (item == NULL) + return DSYNC_IBC_RECV_RET_TRYAGAIN; + + *name_r = (void *)item->u.node.name; + item->u.node.name = NULL; + + *node_r = &item->u.node; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_pipe_send_mailbox_deletes(struct dsync_ibc *ibc, + const struct dsync_mailbox_delete *deletes, + unsigned int count, char hierarchy_sep) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + item = dsync_ibc_pipe_push_item(pipe->remote, ITEM_MAILBOX_DELETE); + + /* we'll assume that the deletes are permanent. this works for now.. */ + /* a little bit kludgy way to send it */ + item->u.mailbox_delete.deletes = deletes; + item->u.mailbox_delete.count = count; + item->u.mailbox_delete.hierarchy_sep = hierarchy_sep; +} + +static enum dsync_ibc_recv_ret +dsync_ibc_pipe_recv_mailbox_deletes(struct dsync_ibc *ibc, + const struct dsync_mailbox_delete **deletes_r, + unsigned int *count_r, + char *hierarchy_sep_r) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + if (dsync_ibc_pipe_try_pop_eol(pipe)) + return DSYNC_IBC_RECV_RET_FINISHED; + + item = dsync_ibc_pipe_pop_item(pipe, ITEM_MAILBOX_DELETE); + if (item == NULL) + return DSYNC_IBC_RECV_RET_TRYAGAIN; + + *deletes_r = item->u.mailbox_delete.deletes; + *count_r = item->u.mailbox_delete.count; + *hierarchy_sep_r = item->u.mailbox_delete.hierarchy_sep; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_pipe_send_mailbox(struct dsync_ibc *ibc, + const struct dsync_mailbox *dsync_box) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + const struct mailbox_cache_field *cf; + struct mailbox_cache_field *ncf; + + item = dsync_ibc_pipe_push_item(pipe->remote, ITEM_MAILBOX); + item->u.dsync_box = *dsync_box; + p_array_init(&item->u.dsync_box.cache_fields, item->pool, + array_count(&dsync_box->cache_fields)); + array_foreach(&dsync_box->cache_fields, cf) { + ncf = array_append_space(&item->u.dsync_box.cache_fields); + ncf->name = p_strdup(item->pool, cf->name); + ncf->decision = cf->decision; + ncf->last_used = cf->last_used; + } +} + +static enum dsync_ibc_recv_ret +dsync_ibc_pipe_recv_mailbox(struct dsync_ibc *ibc, + const struct dsync_mailbox **dsync_box_r) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + if (dsync_ibc_pipe_try_pop_eol(pipe)) + return DSYNC_IBC_RECV_RET_FINISHED; + + item = dsync_ibc_pipe_pop_item(pipe, ITEM_MAILBOX); + if (item == NULL) + return DSYNC_IBC_RECV_RET_TRYAGAIN; + + *dsync_box_r = &item->u.dsync_box; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_pipe_send_change(struct dsync_ibc *ibc, + const struct dsync_mail_change *change) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + item = dsync_ibc_pipe_push_item(pipe->remote, ITEM_MAIL_CHANGE); + dsync_mail_change_dup(item->pool, change, &item->u.change); +} + +static enum dsync_ibc_recv_ret +dsync_ibc_pipe_recv_change(struct dsync_ibc *ibc, + const struct dsync_mail_change **change_r) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + if (dsync_ibc_pipe_try_pop_eol(pipe)) + return DSYNC_IBC_RECV_RET_FINISHED; + + item = dsync_ibc_pipe_pop_item(pipe, ITEM_MAIL_CHANGE); + if (item == NULL) + return DSYNC_IBC_RECV_RET_TRYAGAIN; + + *change_r = &item->u.change; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_pipe_send_mail_request(struct dsync_ibc *ibc, + const struct dsync_mail_request *request) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + item = dsync_ibc_pipe_push_item(pipe->remote, ITEM_MAIL_REQUEST); + item->u.request.guid = p_strdup(item->pool, request->guid); + item->u.request.uid = request->uid; +} + +static enum dsync_ibc_recv_ret +dsync_ibc_pipe_recv_mail_request(struct dsync_ibc *ibc, + const struct dsync_mail_request **request_r) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + if (dsync_ibc_pipe_try_pop_eol(pipe)) + return DSYNC_IBC_RECV_RET_FINISHED; + + item = dsync_ibc_pipe_pop_item(pipe, ITEM_MAIL_REQUEST); + if (item == NULL) + return DSYNC_IBC_RECV_RET_TRYAGAIN; + + *request_r = &item->u.request; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_pipe_send_mail(struct dsync_ibc *ibc, const struct dsync_mail *mail) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + item = dsync_ibc_pipe_push_item(pipe->remote, ITEM_MAIL); + item->u.mail.guid = p_strdup(item->pool, mail->guid); + item->u.mail.pop3_uidl = p_strdup(item->pool, mail->pop3_uidl); + item->u.mail.pop3_order = mail->pop3_order; + item->u.mail.received_date = mail->received_date; + if (mail->input != NULL) { + item->u.mail.input = mail->input; + i_stream_ref(mail->input); + } +} + +static enum dsync_ibc_recv_ret +dsync_ibc_pipe_recv_mail(struct dsync_ibc *ibc, struct dsync_mail **mail_r) +{ + struct dsync_ibc_pipe *pipe = (struct dsync_ibc_pipe *)ibc; + struct item *item; + + if (dsync_ibc_pipe_try_pop_eol(pipe)) + return DSYNC_IBC_RECV_RET_FINISHED; + + item = dsync_ibc_pipe_pop_item(pipe, ITEM_MAIL); + if (item == NULL) + return DSYNC_IBC_RECV_RET_TRYAGAIN; + + *mail_r = &item->u.mail; + return DSYNC_IBC_RECV_RET_OK; +} + +static const struct dsync_ibc_vfuncs dsync_ibc_pipe_vfuncs = { + dsync_ibc_pipe_deinit, + dsync_ibc_pipe_send_handshake, + dsync_ibc_pipe_recv_handshake, + dsync_ibc_pipe_send_end_of_list, + dsync_ibc_pipe_send_mailbox_state, + dsync_ibc_pipe_recv_mailbox_state, + dsync_ibc_pipe_send_mailbox_tree_node, + dsync_ibc_pipe_recv_mailbox_tree_node, + dsync_ibc_pipe_send_mailbox_deletes, + dsync_ibc_pipe_recv_mailbox_deletes, + dsync_ibc_pipe_send_mailbox, + dsync_ibc_pipe_recv_mailbox, + dsync_ibc_pipe_send_change, + dsync_ibc_pipe_recv_change, + dsync_ibc_pipe_send_mail_request, + dsync_ibc_pipe_recv_mail_request, + dsync_ibc_pipe_send_mail, + dsync_ibc_pipe_recv_mail, + NULL, + dsync_ibc_pipe_is_send_queue_full, + dsync_ibc_pipe_has_pending_data +}; + +static struct dsync_ibc_pipe * +dsync_ibc_pipe_alloc(void) +{ + struct dsync_ibc_pipe *pipe; + + pipe = i_new(struct dsync_ibc_pipe, 1); + pipe->ibc.v = dsync_ibc_pipe_vfuncs; + i_array_init(&pipe->pools, 4); + i_array_init(&pipe->item_queue, 4); + return pipe; +} + +void dsync_ibc_init_pipe(struct dsync_ibc **ibc1_r, struct dsync_ibc **ibc2_r) +{ + struct dsync_ibc_pipe *pipe1, *pipe2; + + pipe1 = dsync_ibc_pipe_alloc(); + pipe2 = dsync_ibc_pipe_alloc(); + pipe1->remote = pipe2; + pipe2->remote = pipe1; + *ibc1_r = &pipe1->ibc; + *ibc2_r = &pipe2->ibc; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/doveadm/dsync/dsync-ibc-private.h Fri Sep 07 16:43:44 2012 +0300 @@ -0,0 +1,78 @@ +#ifndef DSYNC_IBC_PRIVATE_H +#define DSYNC_IBC_PRIVATE_H + +#include "dsync-ibc.h" + +struct dsync_ibc_vfuncs { + void (*deinit)(struct dsync_ibc *ibc); + + void (*send_handshake)(struct dsync_ibc *ibc, + const struct dsync_ibc_settings *set); + enum dsync_ibc_recv_ret + (*recv_handshake)(struct dsync_ibc *ibc, + const struct dsync_ibc_settings **set_r); + + void (*send_end_of_list)(struct dsync_ibc *ibc); + + void (*send_mailbox_state)(struct dsync_ibc *ibc, + const struct dsync_mailbox_state *state); + enum dsync_ibc_recv_ret + (*recv_mailbox_state)(struct dsync_ibc *ibc, + struct dsync_mailbox_state *state_r); + + void (*send_mailbox_tree_node)(struct dsync_ibc *ibc, + const char *const *name, + const struct dsync_mailbox_node *node); + enum dsync_ibc_recv_ret + (*recv_mailbox_tree_node)(struct dsync_ibc *ibc, + const char *const **name_r, + const struct dsync_mailbox_node **node_r); + + void (*send_mailbox_deletes)(struct dsync_ibc *ibc, + const struct dsync_mailbox_delete *deletes, + unsigned int count, char hierarchy_sep); + enum dsync_ibc_recv_ret + (*recv_mailbox_deletes)(struct dsync_ibc *ibc, + const struct dsync_mailbox_delete **deletes_r, + unsigned int *count_r, + char *hierarchy_sep_r); + + void (*send_mailbox)(struct dsync_ibc *ibc, + const struct dsync_mailbox *dsync_box); + enum dsync_ibc_recv_ret + (*recv_mailbox)(struct dsync_ibc *ibc, + const struct dsync_mailbox **dsync_box_r); + + void (*send_change)(struct dsync_ibc *ibc, + const struct dsync_mail_change *change); + enum dsync_ibc_recv_ret + (*recv_change)(struct dsync_ibc *ibc, + const struct dsync_mail_change **change_r); + + void (*send_mail_request)(struct dsync_ibc *ibc, + const struct dsync_mail_request *request); + enum dsync_ibc_recv_ret + (*recv_mail_request)(struct dsync_ibc *ibc, + const struct dsync_mail_request **request_r); + + void (*send_mail)(struct dsync_ibc *ibc, + const struct dsync_mail *mail); + enum dsync_ibc_recv_ret + (*recv_mail)(struct dsync_ibc *ibc, + struct dsync_mail **mail_r); + + void (*flush)(struct dsync_ibc *ibc); + bool (*is_send_queue_full)(struct dsync_ibc *ibc); + bool (*has_pending_data)(struct dsync_ibc *ibc); +}; + +struct dsync_ibc { + struct dsync_ibc_vfuncs v; + + io_callback_t *io_callback; + void *io_context; + + unsigned int failed:1; +}; + +#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/doveadm/dsync/dsync-ibc-stream.c Fri Sep 07 16:43:44 2012 +0300 @@ -0,0 +1,1517 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "fd-set-nonblock.h" +#include "safe-mkstemp.h" +#include "ioloop.h" +#include "istream.h" +#include "istream-seekable.h" +#include "istream-dot.h" +#include "ostream.h" +#include "str.h" +#include "strescape.h" +#include "master-service.h" +#include "mail-cache.h" +#include "mail-storage-private.h" +#include "dsync-serializer.h" +#include "dsync-deserializer.h" +#include "dsync-mail.h" +#include "dsync-mailbox.h" +#include "dsync-mailbox-state.h" +#include "dsync-mailbox-tree.h" +#include "dsync-ibc-private.h" + +#include <stdlib.h> + +#define DSYNC_IBC_STREAM_TIMEOUT_MSECS (60*10*1000) +#define DSYNC_IBC_STREAM_OUTBUF_THROTTLE_SIZE (1024*128) + +#define DSYNC_PROTOCOL_VERSION_MAJOR 3 +#define DSYNC_HANDSHAKE_VERSION "VERSION\tdsync\t3\t0\n" + +enum item_type { + ITEM_NONE, + + ITEM_HANDSHAKE, + ITEM_MAILBOX_STATE, + ITEM_MAILBOX_TREE_NODE, + ITEM_MAILBOX_DELETE, + ITEM_MAILBOX, + + ITEM_MAIL_CHANGE, + ITEM_MAIL_REQUEST, + ITEM_MAIL, + + ITEM_MAILBOX_CACHE_FIELD, + + ITEM_END_OF_LIST +}; + +#define END_OF_LIST_LINE "." +static const struct { + /* full human readable name of the item */ + const char *name; + /* unique character identifying the item */ + char chr; + const char *required_keys; + const char *optional_keys; +} items[ITEM_END_OF_LIST+1] = { + { NULL, '\0', NULL, NULL }, + { .name = "handshake", + .chr = 'H', + .optional_keys = "sync_ns_prefix sync_type " + "guid_requests mails_have_guids" + }, + { .name = "mailbox_state", + .chr = 'S', + .required_keys = "mailbox_guid last_uidvalidity last_common_uid " + "last_common_modseq" + }, + { .name = "mailbox_tree_node", + .chr = 'N', + .required_keys = "name existence", + .optional_keys = "mailbox_guid uid_validity " + "last_renamed_or_created subscribed last_subscription_change" + }, + { .name = "mailbox_delete", + .chr = 'D', + .required_keys = "hierarchy_sep", + .optional_keys = "mailboxes dirs" + }, + { .name = "mailbox", + .chr = 'B', + .required_keys = "mailbox_guid uid_validity uid_next " + "messages_count first_recent_uid highest_modseq", + .optional_keys = "cache_fields" + }, + { .name = "mail_change", + .chr = 'C', + .required_keys = "type uid", + .optional_keys = "guid hdr_hash modseq save_timestamp " + "add_flags remove_flags final_flags " + "keywords_reset keyword_changes" + }, + { .name = "mail_request", + .chr = 'R', + .optional_keys = "guid uid" + }, + { .name = "mail", + .chr = 'M', + .optional_keys = "guid uid pop3_uidl pop3_order received_date stream" + }, + { .name = "mailbox_cache_field", + .chr = 'c', + .required_keys = "name decision", + .optional_keys = "last_used" + }, + + { "end_of_list", '\0', NULL, NULL } +}; + +struct dsync_ibc_stream { + struct dsync_ibc ibc; + + char *name, *temp_path_prefix; + int fd_in, fd_out; + struct istream *input; + struct ostream *output; + struct io *io; + struct timeout *to; + + struct dsync_serializer *serializers[ITEM_END_OF_LIST]; + struct dsync_deserializer *deserializers[ITEM_END_OF_LIST]; + + pool_t ret_pool; + struct dsync_deserializer_decoder *cur_decoder; + + struct istream *mail_output, *mail_input; + struct dsync_mail *cur_mail; + char mail_output_last; + + unsigned int version_received:1; + unsigned int handshake_received:1; + unsigned int has_pending_data:1; +}; + +static void dsync_ibc_stream_stop(struct dsync_ibc_stream *ibc) +{ + i_stream_close(ibc->input); + o_stream_close(ibc->output); + io_loop_stop(current_ioloop); +} + +static int dsync_ibc_stream_read_mail_stream(struct dsync_ibc_stream *ibc) +{ + if (i_stream_read(ibc->mail_input) < 0) { + if (ibc->mail_input->stream_errno != 0) { + errno = ibc->mail_input->stream_errno; + i_error("dsync(%s): read() failed: %m", ibc->name); + dsync_ibc_stream_stop(ibc); + return -1; + } + /* finished reading the mail stream */ + i_assert(ibc->mail_input->eof); + i_stream_seek(ibc->mail_input, 0); + ibc->mail_input = NULL; + return 1; + } + i_stream_skip(ibc->mail_input, i_stream_get_data_size(ibc->mail_input)); + return 0; +} + +static void dsync_ibc_stream_input(struct dsync_ibc_stream *ibc) +{ + if (ibc->mail_input != NULL) { + if (dsync_ibc_stream_read_mail_stream(ibc) == 0) + return; + } + ibc->ibc.io_callback(ibc->ibc.io_context); +} + +static int dsync_ibc_stream_send_mail_stream(struct dsync_ibc_stream *ibc) +{ + const unsigned char *data; + unsigned char add; + size_t i, size; + int ret; + + while ((ret = i_stream_read_data(ibc->mail_output, + &data, &size, 0)) > 0) { + add = '\0'; + for (i = 0; i < size; i++) { + if (data[i] == '\n') { + if ((i == 0 && ibc->mail_output_last != '\r') || + (i > 0 && data[i-1] != '\r')) { + /* missing CR */ + add = '\r'; + break; + } + } else if (data[i] == '.' && + ((i == 0 && ibc->mail_output_last == '\n') || + (i > 0 && data[i-1] == '\n'))) { + /* escape the dot */ + add = '.'; + break; + } + } + + if (i > 0) { + o_stream_nsend(ibc->output, data, i); + ibc->mail_output_last = data[i-1]; + i_stream_skip(ibc->mail_output, i); + } + + if (o_stream_get_buffer_used_size(ibc->output) >= 4096) { + if ((ret = o_stream_flush(ibc->output)) < 0) { + dsync_ibc_stream_stop(ibc); + return -1; + } + if (ret == 0) { + /* continue later */ + o_stream_set_flush_pending(ibc->output, TRUE); + return 0; + } + } + + if (add != '\0') { + o_stream_nsend(ibc->output, &add, 1); + ibc->mail_output_last = add; + } + } + i_assert(ret == -1); + + if (ibc->mail_output->stream_errno != 0) { + i_error("dsync(%s): read(%s) failed: %m", + ibc->name, i_stream_get_name(ibc->mail_output)); + dsync_ibc_stream_stop(ibc); + return -1; + } + + /* finished sending the stream */ + o_stream_nsend_str(ibc->output, "\r\n.\r\n"); + i_stream_unref(&ibc->mail_output); + return 1; +} + +static int dsync_ibc_stream_output(struct dsync_ibc_stream *ibc) +{ + struct ostream *output = ibc->output; + int ret; + + if ((ret = o_stream_flush(output)) < 0) + ret = 1; + else if (ibc->mail_output != NULL) { + if (dsync_ibc_stream_send_mail_stream(ibc) < 0) + ret = 1; + } + timeout_reset(ibc->to); + + if (!dsync_ibc_is_send_queue_full(&ibc->ibc)) + ibc->ibc.io_callback(ibc->ibc.io_context); + return ret; +} + +static void dsync_ibc_stream_timeout(struct dsync_ibc_stream *ibc) +{ + i_error("dsync(%s): I/O has stalled, no activity for %u seconds", + ibc->name, DSYNC_IBC_STREAM_TIMEOUT_MSECS/1000); + dsync_ibc_stream_stop(ibc); +} + +static void dsync_ibc_stream_init(struct dsync_ibc_stream *ibc) +{ + unsigned int i; + + fd_set_nonblock(ibc->fd_in, TRUE); + fd_set_nonblock(ibc->fd_out, TRUE); + + ibc->input = i_stream_create_fd(ibc->fd_in, (size_t)-1, FALSE); + ibc->output = o_stream_create_fd(ibc->fd_out, (size_t)-1, FALSE); + ibc->io = io_add(ibc->fd_in, IO_READ, dsync_ibc_stream_input, ibc); + o_stream_set_no_error_handling(ibc->output, TRUE); + o_stream_set_flush_callback(ibc->output, dsync_ibc_stream_output, ibc); + ibc->to = timeout_add(DSYNC_IBC_STREAM_TIMEOUT_MSECS, + dsync_ibc_stream_timeout, ibc); + o_stream_cork(ibc->output); + o_stream_nsend_str(ibc->output, DSYNC_HANDSHAKE_VERSION); + + /* initialize serializers and send their headers to remote */ + for (i = 1; i < ITEM_END_OF_LIST; i++) T_BEGIN { + const char *keys; + + keys = items[i].required_keys == NULL ? items[i].optional_keys : + t_strconcat(items[i].required_keys, " ", + items[i].optional_keys, NULL); + if (keys != NULL) { + i_assert(items[i].chr != '\0'); + + ibc->serializers[i] = + dsync_serializer_init(t_strsplit_spaces(keys, " ")); + o_stream_nsend(ibc->output, &items[i].chr, 1); + o_stream_nsend_str(ibc->output, + dsync_serializer_encode_header_line(ibc->serializers[i])); + } + } T_END; + o_stream_nsend_str(ibc->output, ".\n"); + + dsync_ibc_flush(&ibc->ibc); +} + +static void dsync_ibc_stream_deinit(struct dsync_ibc *_ibc) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + + if (ibc->cur_decoder != NULL) + dsync_deserializer_decode_finish(&ibc->cur_decoder); + if (ibc->mail_output != NULL) + i_stream_unref(&ibc->mail_output); + + timeout_remove(&ibc->to); + if (ibc->io != NULL) + io_remove(&ibc->io); + i_stream_destroy(&ibc->input); + o_stream_destroy(&ibc->output); + if (close(ibc->fd_in) < 0) + i_error("close(%s) failed: %m", ibc->name); + if (ibc->fd_in != ibc->fd_out) { + if (close(ibc->fd_out) < 0) + i_error("close(%s) failed: %m", ibc->name); + } + pool_unref(&ibc->ret_pool); + i_free(ibc->temp_path_prefix); + i_free(ibc->name); + i_free(ibc); +} + +static int dsync_ibc_stream_next_line(struct dsync_ibc_stream *ibc, + const char **line_r) +{ + const char *line; + + line = i_stream_next_line(ibc->input); + if (line != NULL) { + *line_r = line; + return 1; + } + + /* try reading some */ + switch (i_stream_read(ibc->input)) { + case -1: + if (ibc->input->stream_errno != 0) { + errno = ibc->input->stream_errno; + i_error("read(%s) failed: %m", ibc->name); + } else { + i_assert(ibc->input->eof); + i_error("read(%s) failed: EOF", ibc->name); + } + dsync_ibc_stream_stop(ibc); + return -1; + case 0: + return 0; + } + *line_r = i_stream_next_line(ibc->input); + if (*line_r == NULL) { + ibc->has_pending_data = FALSE; + return 0; + } + ibc->has_pending_data = TRUE; + return 1; +} + +static void ATTR_FORMAT(3, 4) ATTR_NULL(2) +dsync_ibc_input_error(struct dsync_ibc_stream *ibc, + struct dsync_deserializer_decoder *decoder, + const char *fmt, ...) +{ + va_list args; + const char *error; + + va_start(args, fmt); + error = t_strdup_vprintf(fmt, args); + if (decoder == NULL) + i_error("dsync(%s): %s", ibc->name, error); + else { + i_error("dsync(%s): %s: %s", ibc->name, + dsync_deserializer_decoder_get_name(decoder), error); + } + va_end(args); + + dsync_ibc_stream_stop(ibc); +} + +static void +dsync_ibc_stream_send_string(struct dsync_ibc_stream *ibc, + const string_t *str) +{ + i_assert(ibc->mail_output == NULL); + o_stream_nsend(ibc->output, str_data(str), str_len(str)); +} + +static int +dsync_ibc_check_missing_deserializers(struct dsync_ibc_stream *ibc) +{ + unsigned int i; + int ret = 0; + + for (i = 1; i < ITEM_END_OF_LIST; i++) { + if (ibc->deserializers[i] == NULL && + (items[i].required_keys != NULL || + items[i].optional_keys != NULL)) { + dsync_ibc_input_error(ibc, NULL, + "Remote didn't handshake deserializer for %s", + items[i].name); + ret = -1; + } + } + return ret; +} + +static bool +dsync_ibc_stream_handshake(struct dsync_ibc_stream *ibc, const char *line) +{ + enum item_type item = ITEM_NONE; + const char *const *required_keys, *error; + unsigned int i; + + if (ibc->handshake_received) + return TRUE; + + if (!ibc->version_received) { + if (!version_string_verify(line, "dsync", + DSYNC_PROTOCOL_VERSION_MAJOR)) { + dsync_ibc_input_error(ibc, NULL, + "Remote dsync doesn't use compatible protocol"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + ibc->version_received = TRUE; + return FALSE; + } + + if (strcmp(line, END_OF_LIST_LINE) == 0) { + /* finished handshaking */ + if (dsync_ibc_check_missing_deserializers(ibc) < 0) + return FALSE; + ibc->handshake_received = TRUE; + return FALSE; + } + + for (i = 1; i < ITEM_END_OF_LIST; i++) { + if (items[i].chr == line[0]) { + item = i; + break; + } + } + if (item == ITEM_NONE) { + /* unknown deserializer, ignore */ + return FALSE; + } + + required_keys = items[item].required_keys == NULL ? NULL : + t_strsplit(items[item].required_keys, " "); + if (dsync_deserializer_init(items[item].name, + required_keys, line + 1, + &ibc->deserializers[item], &error) < 0) { + dsync_ibc_input_error(ibc, NULL, + "Remote sent invalid handshake for %s: %s", + items[item].name, error); + } + return FALSE; +} + +static enum dsync_ibc_recv_ret +dsync_ibc_stream_input_next(struct dsync_ibc_stream *ibc, enum item_type item, + struct dsync_deserializer_decoder **decoder_r) +{ + enum item_type line_item = ITEM_NONE; + const char *line, *error; + unsigned int i; + + i_assert(ibc->mail_input == NULL); + + timeout_reset(ibc->to); + + do { + if (dsync_ibc_stream_next_line(ibc, &line) <= 0) + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } while (!dsync_ibc_stream_handshake(ibc, line)); + + if (strcmp(line, END_OF_LIST_LINE) == 0) { + /* end of this list */ + return DSYNC_IBC_RECV_RET_FINISHED; + } + for (i = 1; i < ITEM_END_OF_LIST; i++) { + if (*line == items[i].chr) { + line_item = i; + break; + } + } + if (line_item != item) { + dsync_ibc_input_error(ibc, NULL, + "Received unexpected input %c != %c", + *line, items[item].chr); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + + if (ibc->cur_decoder != NULL) + dsync_deserializer_decode_finish(&ibc->cur_decoder); + if (dsync_deserializer_decode_begin(ibc->deserializers[item], + line+1, &ibc->cur_decoder, + &error) < 0) { + dsync_ibc_input_error(ibc, NULL, "Invalid input to %s: %s", + items[item].name, error); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + *decoder_r = ibc->cur_decoder; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_stream_send_handshake(struct dsync_ibc *_ibc, + const struct dsync_ibc_settings *set) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_serializer_encoder *encoder; + string_t *str = t_str_new(128); + char sync_type[2]; + + str_append_c(str, items[ITEM_HANDSHAKE].chr); + encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_HANDSHAKE]); + if (set->sync_ns_prefix != NULL) { + dsync_serializer_encode_add(encoder, "sync_ns_prefix", + set->sync_ns_prefix); + } + + sync_type[0] = sync_type[1] = '\0'; + switch (set->sync_type) { + case DSYNC_BRAIN_SYNC_TYPE_UNKNOWN: + break; + case DSYNC_BRAIN_SYNC_TYPE_FULL: + sync_type[0] = 'f'; + break; + case DSYNC_BRAIN_SYNC_TYPE_CHANGED: + sync_type[0] = 'c'; + break; + case DSYNC_BRAIN_SYNC_TYPE_STATE: + sync_type[0] = 's'; + break; + } + i_assert(sync_type[0] != '\0'); + dsync_serializer_encode_add(encoder, "sync_type", sync_type); + if (set->guid_requests) + dsync_serializer_encode_add(encoder, "guid_requests", ""); + if (set->mails_have_guids) + dsync_serializer_encode_add(encoder, "mails_have_guids", ""); + + dsync_serializer_encode_finish(&encoder, str); + dsync_ibc_stream_send_string(ibc, str); +} + +static enum dsync_ibc_recv_ret +dsync_ibc_stream_recv_handshake(struct dsync_ibc *_ibc, + const struct dsync_ibc_settings **set_r) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_deserializer_decoder *decoder; + struct dsync_ibc_settings *set; + const char *value; + pool_t pool = ibc->ret_pool; + enum dsync_ibc_recv_ret ret; + + ret = dsync_ibc_stream_input_next(ibc, ITEM_HANDSHAKE, &decoder); + if (ret != DSYNC_IBC_RECV_RET_OK) { + if (ret != DSYNC_IBC_RECV_RET_TRYAGAIN) { + i_error("dsync(%s): Unexpected input in handshake", + ibc->name); + dsync_ibc_stream_stop(ibc); + } + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + + p_clear(pool); + set = p_new(pool, struct dsync_ibc_settings, 1); + + if (dsync_deserializer_decode_try(decoder, "sync_ns_prefix", &value)) + set->sync_ns_prefix = p_strdup(pool, value); + if (dsync_deserializer_decode_try(decoder, "sync_type", &value)) { + switch (value[0]) { + case 'f': + set->sync_type = DSYNC_BRAIN_SYNC_TYPE_FULL; + break; + case 'c': + set->sync_type = DSYNC_BRAIN_SYNC_TYPE_CHANGED; + break; + case 's': + set->sync_type = DSYNC_BRAIN_SYNC_TYPE_STATE; + break; + default: + dsync_ibc_input_error(ibc, decoder, + "Unknown sync_type: %s", value); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + } + if (dsync_deserializer_decode_try(decoder, "guid_requests", &value)) + set->guid_requests = TRUE; + if (dsync_deserializer_decode_try(decoder, "mails_have_guids", &value)) + set->mails_have_guids = TRUE; + + *set_r = set; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_stream_send_end_of_list(struct dsync_ibc *_ibc) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + + i_assert(ibc->mail_output == NULL); + + o_stream_nsend_str(ibc->output, END_OF_LIST_LINE"\n"); +} + +static void +dsync_ibc_stream_send_mailbox_state(struct dsync_ibc *_ibc, + const struct dsync_mailbox_state *state) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_serializer_encoder *encoder; + string_t *str = t_str_new(128); + + str_append_c(str, items[ITEM_MAILBOX_STATE].chr); + encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_MAILBOX_STATE]); + dsync_serializer_encode_add(encoder, "mailbox_guid", + guid_128_to_string(state->mailbox_guid)); + dsync_serializer_encode_add(encoder, "last_uidvalidity", + dec2str(state->last_uidvalidity)); + dsync_serializer_encode_add(encoder, "last_common_uid", + dec2str(state->last_common_uid)); + dsync_serializer_encode_add(encoder, "last_common_modseq", + dec2str(state->last_common_modseq)); + + dsync_serializer_encode_finish(&encoder, str); + dsync_ibc_stream_send_string(ibc, str); +} + +static enum dsync_ibc_recv_ret +dsync_ibc_stream_recv_mailbox_state(struct dsync_ibc *_ibc, + struct dsync_mailbox_state *state_r) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_deserializer_decoder *decoder; + const char *value; + enum dsync_ibc_recv_ret ret; + + memset(state_r, 0, sizeof(*state_r)); + + ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX_STATE, &decoder); + if (ret != DSYNC_IBC_RECV_RET_OK) + return ret; + + value = dsync_deserializer_decode_get(decoder, "mailbox_guid"); + if (guid_128_from_string(value, state_r->mailbox_guid) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid mailbox_guid"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + value = dsync_deserializer_decode_get(decoder, "last_uidvalidity"); + if (str_to_uint32(value, &state_r->last_uidvalidity) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid last_uidvalidity"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + value = dsync_deserializer_decode_get(decoder, "last_common_uid"); + if (str_to_uint32(value, &state_r->last_uidvalidity) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid last_common_uid"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + value = dsync_deserializer_decode_get(decoder, "last_common_modseq"); + if (str_to_uint32(value, &state_r->last_uidvalidity) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid last_common_modseq"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_stream_send_mailbox_tree_node(struct dsync_ibc *_ibc, + const char *const *name, + const struct dsync_mailbox_node *node) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_serializer_encoder *encoder; + string_t *str, *namestr; + + i_assert(*name != NULL); + + str = t_str_new(128); + str_append_c(str, items[ITEM_MAILBOX_TREE_NODE].chr); + + /* convert all hierarchy separators to tabs. mailbox names really + aren't supposed to have any tabs, but escape them anyway if there + are. */ + namestr = t_str_new(128); + for (; *name != NULL; name++) { + str_tabescape_write(namestr, *name); + str_append_c(namestr, '\t'); + } + str_truncate(namestr, str_len(namestr)-1); + + encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_MAILBOX_TREE_NODE]); + dsync_serializer_encode_add(encoder, "name", str_c(namestr)); + switch (node->existence) { + case DSYNC_MAILBOX_NODE_NONEXISTENT: + dsync_serializer_encode_add(encoder, "existence", "n"); + break; + case DSYNC_MAILBOX_NODE_EXISTS: + dsync_serializer_encode_add(encoder, "existence", "y"); + break; + case DSYNC_MAILBOX_NODE_DELETED: + dsync_serializer_encode_add(encoder, "existence", "d"); + break; + } + + if (!guid_128_is_empty(node->mailbox_guid)) { + dsync_serializer_encode_add(encoder, "mailbox_guid", + guid_128_to_string(node->mailbox_guid)); + } + if (node->uid_validity != 0) { + dsync_serializer_encode_add(encoder, "uid_validity", + dec2str(node->uid_validity)); + } + if (node->last_renamed_or_created != 0) { + dsync_serializer_encode_add(encoder, "last_renamed_or_created", + dec2str(node->last_renamed_or_created)); + } + if (node->last_subscription_change != 0) { + dsync_serializer_encode_add(encoder, "last_subscription_change", + dec2str(node->last_subscription_change)); + } + if (node->subscribed) + dsync_serializer_encode_add(encoder, "subscribed", ""); + dsync_serializer_encode_finish(&encoder, str); + dsync_ibc_stream_send_string(ibc, str); +} + +static enum dsync_ibc_recv_ret +dsync_ibc_stream_recv_mailbox_tree_node(struct dsync_ibc *_ibc, + const char *const **name_r, + const struct dsync_mailbox_node **node_r) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_deserializer_decoder *decoder; + struct dsync_mailbox_node *node; + const char *value; + enum dsync_ibc_recv_ret ret; + + ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX_TREE_NODE, &decoder); + if (ret != DSYNC_IBC_RECV_RET_OK) + return ret; + + p_clear(ibc->ret_pool); + node = p_new(ibc->ret_pool, struct dsync_mailbox_node, 1); + + value = dsync_deserializer_decode_get(decoder, "name"); + if (*value == '\0') { + dsync_ibc_input_error(ibc, decoder, "Empty name"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + *name_r = (void *)p_strsplit_tabescaped(ibc->ret_pool, value); + + value = dsync_deserializer_decode_get(decoder, "existence"); + switch (*value) { + case 'n': + node->existence = DSYNC_MAILBOX_NODE_NONEXISTENT; + break; + case 'y': + node->existence = DSYNC_MAILBOX_NODE_EXISTS; + break; + case 'd': + node->existence = DSYNC_MAILBOX_NODE_DELETED; + break; + } + + if (dsync_deserializer_decode_try(decoder, "mailbox_guid", &value) && + guid_128_from_string(value, node->mailbox_guid) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid mailbox_guid"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + if (dsync_deserializer_decode_try(decoder, "uid_validity", &value) && + str_to_uint32(value, &node->uid_validity) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid uid_validity"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + if (dsync_deserializer_decode_try(decoder, "last_renamed_or_created", &value) && + str_to_time(value, &node->last_renamed_or_created) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid last_renamed_or_created"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + if (dsync_deserializer_decode_try(decoder, "last_subscription_change", &value) && + str_to_time(value, &node->last_subscription_change) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid last_subscription_change"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + if (dsync_deserializer_decode_try(decoder, "subscribed", &value)) + node->subscribed = TRUE; + + *node_r = node; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_stream_send_mailbox_deletes(struct dsync_ibc *_ibc, + const struct dsync_mailbox_delete *deletes, + unsigned int count, char hierarchy_sep) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_serializer_encoder *encoder; + string_t *str, *substr; + char sep[2]; + unsigned int i; + + str = t_str_new(128); + str_append_c(str, items[ITEM_MAILBOX_DELETE].chr); + + encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_MAILBOX_DELETE]); + sep[0] = hierarchy_sep; sep[1] = '\0'; + dsync_serializer_encode_add(encoder, "hierarchy_sep", sep); + + substr = t_str_new(128); + for (i = 0; i < count; i++) { + if (deletes[i].delete_mailbox) { + str_append(substr, guid_128_to_string(deletes[i].guid)); + str_printfa(substr, " %ld ", (long)deletes[i].timestamp); + } + } + if (str_len(substr) > 0) { + str_truncate(substr, str_len(substr)-1); + dsync_serializer_encode_add(encoder, "mailboxes", + str_c(substr)); + } + + str_truncate(substr, 0); + for (i = 0; i < count; i++) { + if (!deletes[i].delete_mailbox) { + str_append(substr, guid_128_to_string(deletes[i].guid)); + str_printfa(substr, " %ld ", (long)deletes[i].timestamp); + } + } + if (str_len(substr) > 0) { + str_truncate(substr, str_len(substr)-1); + dsync_serializer_encode_add(encoder, "dirs", str_c(substr)); + } + dsync_serializer_encode_finish(&encoder, str); + dsync_ibc_stream_send_string(ibc, str); +} + +ARRAY_DEFINE_TYPE(dsync_mailbox_delete, struct dsync_mailbox_delete); +static int +decode_mailbox_deletes(ARRAY_TYPE(dsync_mailbox_delete) *deletes, + const char *value, bool delete_mailbox) +{ + struct dsync_mailbox_delete *del; + const char *const *tmp; + unsigned int i; + + tmp = t_strsplit(value, " "); + for (i = 0; tmp[i] != NULL; i += 2) { + del = array_append_space(deletes); + del->delete_mailbox = delete_mailbox; + if (guid_128_from_string(tmp[i], del->guid) < 0) + return -1; + if (tmp[i+1] == NULL || + str_to_time(tmp[i+1], &del->timestamp) < 0) + return -1; + } + return 0; +} + +static enum dsync_ibc_recv_ret +dsync_ibc_stream_recv_mailbox_deletes(struct dsync_ibc *_ibc, + const struct dsync_mailbox_delete **deletes_r, + unsigned int *count_r, char *hierarchy_sep_r) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_deserializer_decoder *decoder; + ARRAY_TYPE(dsync_mailbox_delete) deletes; + const char *value; + enum dsync_ibc_recv_ret ret; + + ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX_DELETE, &decoder); + if (ret != DSYNC_IBC_RECV_RET_OK) + return ret; + + p_clear(ibc->ret_pool); + p_array_init(&deletes, ibc->ret_pool, 16); + + value = dsync_deserializer_decode_get(decoder, "hierarchy_sep"); + if (strlen(value) != 1) { + dsync_ibc_input_error(ibc, decoder, "Invalid hierarchy_sep"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + *hierarchy_sep_r = value[0]; + + if (dsync_deserializer_decode_try(decoder, "mailboxes", &value) && + decode_mailbox_deletes(&deletes, value, TRUE) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid mailboxes"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + if (dsync_deserializer_decode_try(decoder, "dirs", &value) && + decode_mailbox_deletes(&deletes, value, FALSE) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid dirs"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + *deletes_r = array_get(&deletes, count_r); + return DSYNC_IBC_RECV_RET_OK; +} + +static const char * +get_cache_fields(struct dsync_ibc_stream *ibc, + const struct dsync_mailbox *dsync_box) +{ + struct dsync_serializer_encoder *encoder; + string_t *str; + const struct mailbox_cache_field *cache_fields; + unsigned int i, count; + char decision[3]; + + cache_fields = array_get(&dsync_box->cache_fields, &count); + if (count == 0) + return ""; + + str = t_str_new(128); + for (i = 0; i < count; i++) { + const struct mailbox_cache_field *field = &cache_fields[i]; + + encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_MAILBOX_CACHE_FIELD]); + dsync_serializer_encode_add(encoder, "name", field->name); + + memset(decision, 0, sizeof(decision)); + switch (field->decision & ~MAIL_CACHE_DECISION_FORCED) { + case MAIL_CACHE_DECISION_NO: + decision[0] = 'n'; + break; + case MAIL_CACHE_DECISION_TEMP: + decision[0] = 't'; + break; + case MAIL_CACHE_DECISION_YES: + decision[0] = 'y'; + break; + } + i_assert(decision[0] != '\0'); + if ((field->decision & MAIL_CACHE_DECISION_FORCED) != 0) + decision[1] = 'F'; + dsync_serializer_encode_add(encoder, "decision", decision); + if (field->last_used != 0) { + dsync_serializer_encode_add(encoder, "last_used", + dec2str(field->last_used)); + } + dsync_serializer_encode_finish(&encoder, str); + } + if (i > 0) { + /* remove the trailing LF */ + str_truncate(str, str_len(str)-1); + } + return str_c(str); +} + +static void +dsync_ibc_stream_send_mailbox(struct dsync_ibc *_ibc, + const struct dsync_mailbox *dsync_box) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_serializer_encoder *encoder; + string_t *str = t_str_new(128); + const char *value; + + str_append_c(str, items[ITEM_MAILBOX].chr); + encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_MAILBOX]); + dsync_serializer_encode_add(encoder, "mailbox_guid", + guid_128_to_string(dsync_box->mailbox_guid)); + + if (dsync_box->mailbox_lost) + dsync_serializer_encode_add(encoder, "mailbox_lost", ""); + dsync_serializer_encode_add(encoder, "uid_validity", + dec2str(dsync_box->uid_validity)); + dsync_serializer_encode_add(encoder, "uid_next", + dec2str(dsync_box->uid_next)); + dsync_serializer_encode_add(encoder, "messages_count", + dec2str(dsync_box->messages_count)); + dsync_serializer_encode_add(encoder, "first_recent_uid", + dec2str(dsync_box->first_recent_uid)); + dsync_serializer_encode_add(encoder, "highest_modseq", + dec2str(dsync_box->highest_modseq)); + + value = get_cache_fields(ibc, dsync_box); + if (value != NULL) + dsync_serializer_encode_add(encoder, "cache_fields", value); + + dsync_serializer_encode_finish(&encoder, str); + dsync_ibc_stream_send_string(ibc, str); +} + +static int +parse_cache_field(struct dsync_ibc_stream *ibc, struct dsync_mailbox *box, + const char *value) +{ + struct dsync_deserializer_decoder *decoder; + struct mailbox_cache_field field; + const char *error; + int ret = 0; + + if (dsync_deserializer_decode_begin(ibc->deserializers[ITEM_MAILBOX_CACHE_FIELD], + value, &decoder, &error) < 0) { + dsync_ibc_input_error(ibc, NULL, + "cache_field: Invalid input: %s", error); + return -1; + } + + memset(&field, 0, sizeof(field)); + value = dsync_deserializer_decode_get(decoder, "name"); + field.name = p_strdup(ibc->ret_pool, value); + + value = dsync_deserializer_decode_get(decoder, "decision"); + switch (*value) { + case 'n': + field.decision = MAIL_CACHE_DECISION_NO; + break; + case 't': + field.decision = MAIL_CACHE_DECISION_TEMP; + break; + case 'y': + field.decision = MAIL_CACHE_DECISION_YES; + break; + default: + dsync_ibc_input_error(ibc, decoder, "Invalid decision: %s", + value); + ret = -1; + break; + } + if (value[1] == 'F') + field.decision |= MAIL_CACHE_DECISION_FORCED; + + if (dsync_deserializer_decode_try(decoder, "last_used", &value) && + str_to_time(value, &field.last_used) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid last_used"); + ret = -1; + } + array_append(&box->cache_fields, &field, 1); + + dsync_deserializer_decode_finish(&decoder); + return ret; +} + +static enum dsync_ibc_recv_ret +dsync_ibc_stream_recv_mailbox(struct dsync_ibc *_ibc, + const struct dsync_mailbox **dsync_box_r) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + pool_t pool = ibc->ret_pool; + struct dsync_deserializer_decoder *decoder; + struct dsync_mailbox *box; + const char *value; + enum dsync_ibc_recv_ret ret; + + p_clear(pool); + box = p_new(pool, struct dsync_mailbox, 1); + + ret = dsync_ibc_stream_input_next(ibc, ITEM_MAILBOX, &decoder); + if (ret != DSYNC_IBC_RECV_RET_OK) + return ret; + + value = dsync_deserializer_decode_get(decoder, "mailbox_guid"); + if (guid_128_from_string(value, box->mailbox_guid) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid mailbox_guid"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + + if (dsync_deserializer_decode_try(decoder, "mailbox_lost", &value)) + box->mailbox_lost = TRUE; + value = dsync_deserializer_decode_get(decoder, "uid_validity"); + if (str_to_uint32(value, &box->uid_validity) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid uid_validity"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + value = dsync_deserializer_decode_get(decoder, "uid_next"); + if (str_to_uint32(value, &box->uid_next) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid uid_next"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + value = dsync_deserializer_decode_get(decoder, "messages_count"); + if (str_to_uint32(value, &box->messages_count) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid messages_count"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + value = dsync_deserializer_decode_get(decoder, "first_recent_uid"); + if (str_to_uint32(value, &box->first_recent_uid) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid first_recent_uid"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + value = dsync_deserializer_decode_get(decoder, "highest_modseq"); + if (str_to_uint64(value, &box->highest_modseq) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid highest_modseq"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + + p_array_init(&box->cache_fields, pool, 32); + if (dsync_deserializer_decode_try(decoder, "cache_fields", &value)) { + const char *const *fields = t_strsplit(value, "\n"); + for (; *fields != NULL; fields++) { + if (parse_cache_field(ibc, box, *fields) < 0) + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + } + + *dsync_box_r = box; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_stream_send_change(struct dsync_ibc *_ibc, + const struct dsync_mail_change *change) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_serializer_encoder *encoder; + string_t *str = t_str_new(128); + char type[2]; + + str_append_c(str, items[ITEM_MAIL_CHANGE].chr); + encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_MAIL_CHANGE]); + + type[0] = type[1] = '\0'; + switch (change->type) { + case DSYNC_MAIL_CHANGE_TYPE_SAVE: + type[0] = 's'; + break; + case DSYNC_MAIL_CHANGE_TYPE_EXPUNGE: + type[0] = 'e'; + break; + case DSYNC_MAIL_CHANGE_TYPE_FLAG_CHANGE: + type[0] = 'f'; + break; + } + i_assert(type[0] != '\0'); + dsync_serializer_encode_add(encoder, "type", type); + dsync_serializer_encode_add(encoder, "uid", dec2str(change->uid)); + if (change->guid != NULL) + dsync_serializer_encode_add(encoder, "guid", change->guid); + if (change->hdr_hash != NULL) { + dsync_serializer_encode_add(encoder, "hdr_hash", + change->hdr_hash); + } + if (change->modseq != 0) { + dsync_serializer_encode_add(encoder, "modseq", + dec2str(change->modseq)); + } + if (change->save_timestamp != 0) { + dsync_serializer_encode_add(encoder, "save_timestamp", + dec2str(change->save_timestamp)); + } + if (change->add_flags != 0) { + dsync_serializer_encode_add(encoder, "add_flags", + t_strdup_printf("%x", change->add_flags)); + } + if (change->remove_flags != 0) { + dsync_serializer_encode_add(encoder, "remove_flags", + t_strdup_printf("%x", change->remove_flags)); + } + if (change->final_flags != 0) { + dsync_serializer_encode_add(encoder, "final_flags", + t_strdup_printf("%x", change->final_flags)); + } + if (change->keywords_reset) + dsync_serializer_encode_add(encoder, "keywords_reset", ""); + + if (array_is_created(&change->keyword_changes) && + array_count(&change->keyword_changes) > 0) { + string_t *kw_str = t_str_new(128); + const char *const *changes; + unsigned int i, count; + + changes = array_get(&change->keyword_changes, &count); + str_tabescape_write(kw_str, changes[0]); + for (i = 1; i < count; i++) { + str_append_c(kw_str, '\t'); + str_tabescape_write(kw_str, changes[i]); + } + dsync_serializer_encode_add(encoder, "keyword_changes", + str_c(kw_str)); + } + + dsync_serializer_encode_finish(&encoder, str); + dsync_ibc_stream_send_string(ibc, str); +} + +static enum dsync_ibc_recv_ret +dsync_ibc_stream_recv_change(struct dsync_ibc *_ibc, + const struct dsync_mail_change **change_r) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + pool_t pool = ibc->ret_pool; + struct dsync_deserializer_decoder *decoder; + struct dsync_mail_change *change; + const char *value; + enum dsync_ibc_recv_ret ret; + + p_clear(pool); + change = p_new(pool, struct dsync_mail_change, 1); + + ret = dsync_ibc_stream_input_next(ibc, ITEM_MAIL_CHANGE, &decoder); + if (ret != DSYNC_IBC_RECV_RET_OK) + return ret; + + value = dsync_deserializer_decode_get(decoder, "type"); + switch (*value) { + case 's': + change->type = DSYNC_MAIL_CHANGE_TYPE_SAVE; + break; + case 'e': + change->type = DSYNC_MAIL_CHANGE_TYPE_EXPUNGE; + break; + case 'f': + change->type = DSYNC_MAIL_CHANGE_TYPE_FLAG_CHANGE; + break; + default: + dsync_ibc_input_error(ibc, decoder, "Invalid type: %s", value); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + + value = dsync_deserializer_decode_get(decoder, "uid"); + if (str_to_uint32(value, &change->uid) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid uid"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + + if (dsync_deserializer_decode_try(decoder, "guid", &value)) + change->guid = p_strdup(pool, value); + if (dsync_deserializer_decode_try(decoder, "hdr_hash", &value)) + change->hdr_hash = p_strdup(pool, value); + if (dsync_deserializer_decode_try(decoder, "modseq", &value) && + str_to_uint64(value, &change->modseq) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid modseq"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + if (dsync_deserializer_decode_try(decoder, "save_timestamp", &value) && + str_to_time(value, &change->save_timestamp) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid save_timestamp"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + + if (dsync_deserializer_decode_try(decoder, "add_flags", &value)) + change->add_flags = strtoul(value, NULL, 16); + if (dsync_deserializer_decode_try(decoder, "remove_flags", &value)) + change->remove_flags = strtoul(value, NULL, 16); + if (dsync_deserializer_decode_try(decoder, "final_flags", &value)) + change->final_flags = strtoul(value, NULL, 16); + if (dsync_deserializer_decode_try(decoder, "keywords_reset", &value)) + change->keywords_reset = TRUE; + + if (dsync_deserializer_decode_try(decoder, "keyword_changes", &value) && + *value != '\0') { + const char *const *changes = t_strsplit_tab(value); + unsigned int i, count = str_array_length(changes); + + p_array_init(&change->keyword_changes, pool, count); + for (i = 0; i < count; i++) { + value = p_strdup(pool, changes[i]); + array_append(&change->keyword_changes, &value, 1); + } + } + + *change_r = change; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_stream_send_mail_request(struct dsync_ibc *_ibc, + const struct dsync_mail_request *request) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_serializer_encoder *encoder; + string_t *str = t_str_new(128); + + str_append_c(str, items[ITEM_MAIL_REQUEST].chr); + encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_MAIL_REQUEST]); + if (request->guid != NULL) + dsync_serializer_encode_add(encoder, "guid", request->guid); + if (request->uid != 0) { + dsync_serializer_encode_add(encoder, "uid", + dec2str(request->uid)); + } + dsync_serializer_encode_finish(&encoder, str); + dsync_ibc_stream_send_string(ibc, str); +} + +static enum dsync_ibc_recv_ret +dsync_ibc_stream_recv_mail_request(struct dsync_ibc *_ibc, + const struct dsync_mail_request **request_r) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_deserializer_decoder *decoder; + struct dsync_mail_request *request; + const char *value; + enum dsync_ibc_recv_ret ret; + + p_clear(ibc->ret_pool); + request = p_new(ibc->ret_pool, struct dsync_mail_request, 1); + + ret = dsync_ibc_stream_input_next(ibc, ITEM_MAIL_REQUEST, &decoder); + if (ret != DSYNC_IBC_RECV_RET_OK) + return ret; + + if (dsync_deserializer_decode_try(decoder, "guid", &value)) + request->guid = p_strdup(ibc->ret_pool, value); + if (dsync_deserializer_decode_try(decoder, "uid", &value) && + str_to_uint32(value, &request->uid) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid uid"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + + *request_r = request; + return DSYNC_IBC_RECV_RET_OK; +} + +static void +dsync_ibc_stream_send_mail(struct dsync_ibc *_ibc, + const struct dsync_mail *mail) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + struct dsync_serializer_encoder *encoder; + string_t *str = t_str_new(128); + + i_assert(ibc->mail_output == NULL); + + str_append_c(str, items[ITEM_MAIL].chr); + encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_MAIL]); + if (mail->guid != NULL) + dsync_serializer_encode_add(encoder, "guid", mail->guid); + if (mail->uid != 0) + dsync_serializer_encode_add(encoder, "uid", dec2str(mail->uid)); + if (mail->pop3_uidl != NULL) { + dsync_serializer_encode_add(encoder, "pop3_uidl", + mail->pop3_uidl); + } + if (mail->pop3_order > 0) { + dsync_serializer_encode_add(encoder, "pop3_order", + dec2str(mail->pop3_order)); + } + if (mail->received_date > 0) { + dsync_serializer_encode_add(encoder, "received_date", + dec2str(mail->received_date)); + } + if (mail->input != NULL) + dsync_serializer_encode_add(encoder, "stream", ""); + + dsync_serializer_encode_finish(&encoder, str); + dsync_ibc_stream_send_string(ibc, str); + + if (mail->input != NULL) { + ibc->mail_output_last = '\0'; + ibc->mail_output = mail->input; + i_stream_ref(ibc->mail_output); + (void)dsync_ibc_stream_send_mail_stream(ibc); + } +} + +static int seekable_fd_callback(const char **path_r, void *context) +{ + struct dsync_ibc_stream *ibc = context; + string_t *path; + int fd; + + path = t_str_new(128); + str_append(path, ibc->temp_path_prefix); + fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1); + if (fd == -1) { + i_error("safe_mkstemp(%s) failed: %m", str_c(path)); + return -1; + } + + /* we just want the fd, unlink it */ + if (unlink(str_c(path)) < 0) { + /* shouldn't happen.. */ + i_error("unlink(%s) failed: %m", str_c(path)); + i_close_fd(&fd); + return -1; + } + + *path_r = str_c(path); + return fd; +} + +static enum dsync_ibc_recv_ret +dsync_ibc_stream_recv_mail(struct dsync_ibc *_ibc, struct dsync_mail **mail_r) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + pool_t pool = ibc->ret_pool; + struct dsync_deserializer_decoder *decoder; + struct dsync_mail *mail; + struct istream *inputs[2]; + const char *value; + enum dsync_ibc_recv_ret ret; + + if (ibc->mail_input != NULL) { + /* wait until the mail's stream has been read */ + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + if (ibc->cur_mail != NULL) { + /* finished reading the stream, return the mail now */ + *mail_r = ibc->cur_mail; + ibc->cur_mail = NULL; + return DSYNC_IBC_RECV_RET_OK; + } + + p_clear(pool); + mail = p_new(pool, struct dsync_mail, 1); + + ret = dsync_ibc_stream_input_next(ibc, ITEM_MAIL, &decoder); + if (ret != DSYNC_IBC_RECV_RET_OK) + return ret; + + if (dsync_deserializer_decode_try(decoder, "guid", &value)) + mail->guid = p_strdup(pool, value); + if (dsync_deserializer_decode_try(decoder, "uid", &value) && + str_to_uint32(value, &mail->uid) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid uid"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + if (dsync_deserializer_decode_try(decoder, "pop3_uidl", &value)) + mail->pop3_uidl = p_strdup(pool, value); + if (dsync_deserializer_decode_try(decoder, "pop3_order", &value) && + str_to_uint(value, &mail->pop3_order) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid pop3_order"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + if (dsync_deserializer_decode_try(decoder, "received_date", &value) && + str_to_time(value, &mail->received_date) < 0) { + dsync_ibc_input_error(ibc, decoder, "Invalid received_date"); + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + if (dsync_deserializer_decode_try(decoder, "stream", &value)) { + inputs[0] = i_stream_create_dot(ibc->input, FALSE); + inputs[1] = NULL; + mail->input = i_stream_create_seekable(inputs, + MAIL_READ_FULL_BLOCK_SIZE, seekable_fd_callback, ibc); + i_stream_unref(&inputs[0]); + + ibc->mail_input = mail->input; + if (dsync_ibc_stream_read_mail_stream(ibc) <= 0) { + ibc->cur_mail = mail; + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + /* already finished reading the stream */ + i_assert(ibc->mail_input == NULL); + } + + *mail_r = mail; + return DSYNC_IBC_RECV_RET_OK; +} + +static void dsync_ibc_stream_flush(struct dsync_ibc *_ibc) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + + o_stream_uncork(ibc->output); + o_stream_cork(ibc->output); +} + +static bool dsync_ibc_stream_is_send_queue_full(struct dsync_ibc *_ibc) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + size_t bytes; + + if (ibc->mail_output != NULL) + return TRUE; + + bytes = o_stream_get_buffer_used_size(ibc->output); + if (bytes < DSYNC_IBC_STREAM_OUTBUF_THROTTLE_SIZE) + return FALSE; + + o_stream_set_flush_pending(ibc->output, TRUE); + return TRUE; +} + +static bool dsync_ibc_stream_has_pending_data(struct dsync_ibc *_ibc) +{ + struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; + + return ibc->has_pending_data; +} + +static const struct dsync_ibc_vfuncs dsync_ibc_stream_vfuncs = { + dsync_ibc_stream_deinit, + dsync_ibc_stream_send_handshake, + dsync_ibc_stream_recv_handshake, + dsync_ibc_stream_send_end_of_list, + dsync_ibc_stream_send_mailbox_state, + dsync_ibc_stream_recv_mailbox_state, + dsync_ibc_stream_send_mailbox_tree_node, + dsync_ibc_stream_recv_mailbox_tree_node, + dsync_ibc_stream_send_mailbox_deletes, + dsync_ibc_stream_recv_mailbox_deletes, + dsync_ibc_stream_send_mailbox, + dsync_ibc_stream_recv_mailbox, + dsync_ibc_stream_send_change, + dsync_ibc_stream_recv_change, + dsync_ibc_stream_send_mail_request, + dsync_ibc_stream_recv_mail_request, + dsync_ibc_stream_send_mail, + dsync_ibc_stream_recv_mail, + dsync_ibc_stream_flush, + dsync_ibc_stream_is_send_queue_full, + dsync_ibc_stream_has_pending_data +}; + +struct dsync_ibc * +dsync_ibc_init_stream(int fd_in, int fd_out, const char *name, + const char *temp_path_prefix) +{ + struct dsync_ibc_stream *ibc; + + ibc = i_new(struct dsync_ibc_stream, 1); + ibc->ibc.v = dsync_ibc_stream_vfuncs; + ibc->fd_in = fd_in; + ibc->fd_out = fd_out; + ibc->name = i_strdup(name); + ibc->temp_path_prefix = i_strdup(temp_path_prefix); + ibc->ret_pool = pool_alloconly_create("ibc stream data", 2048); + dsync_ibc_stream_init(ibc); + return &ibc->ibc; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/doveadm/dsync/dsync-ibc.c Fri Sep 07 16:43:44 2012 +0300 @@ -0,0 +1,200 @@ +/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "dsync-mail.h" +#include "dsync-ibc-private.h" + +void dsync_ibc_deinit(struct dsync_ibc **_ibc) +{ + struct dsync_ibc *ibc = *_ibc; + + *_ibc = NULL; + ibc->v.deinit(ibc); +} + +void dsync_ibc_set_io_callback(struct dsync_ibc *ibc, + io_callback_t *callback, void *context) +{ + ibc->io_callback = callback; + ibc->io_context = context; +} + +void dsync_ibc_send_handshake(struct dsync_ibc *ibc, + const struct dsync_ibc_settings *set) +{ + ibc->v.send_handshake(ibc, set); +} + +enum dsync_ibc_recv_ret +dsync_ibc_recv_handshake(struct dsync_ibc *ibc, + const struct dsync_ibc_settings **set_r) +{ + return ibc->v.recv_handshake(ibc, set_r); +} + +static enum dsync_ibc_send_ret +dsync_ibc_send_ret(struct dsync_ibc *ibc) +{ + return ibc->v.is_send_queue_full(ibc) ? + DSYNC_IBC_SEND_RET_FULL : + DSYNC_IBC_SEND_RET_OK; +} + +enum dsync_ibc_send_ret +dsync_ibc_send_end_of_list(struct dsync_ibc *ibc) +{ + ibc->v.send_end_of_list(ibc); + return dsync_ibc_send_ret(ibc); +} + +enum dsync_ibc_send_ret +dsync_ibc_send_mailbox_state(struct dsync_ibc *ibc, + const struct dsync_mailbox_state *state) +{ + T_BEGIN { + ibc->v.send_mailbox_state(ibc, state); + } T_END; + return dsync_ibc_send_ret(ibc); +} + +enum dsync_ibc_recv_ret +dsync_ibc_recv_mailbox_state(struct dsync_ibc *ibc, + struct dsync_mailbox_state *state_r) +{ + return ibc->v.recv_mailbox_state(ibc, state_r); +} + +enum dsync_ibc_send_ret +dsync_ibc_send_mailbox_tree_node(struct dsync_ibc *ibc, + const char *const *name, + const struct dsync_mailbox_node *node) +{ + i_assert(*name != NULL); + + T_BEGIN { + ibc->v.send_mailbox_tree_node(ibc, name, node); + } T_END; + return dsync_ibc_send_ret(ibc); +} + +enum dsync_ibc_recv_ret +dsync_ibc_recv_mailbox_tree_node(struct dsync_ibc *ibc, + const char *const **name_r, + const struct dsync_mailbox_node **node_r) +{ + return ibc->v.recv_mailbox_tree_node(ibc, name_r, node_r); +} + +enum dsync_ibc_send_ret +dsync_ibc_send_mailbox_deletes(struct dsync_ibc *ibc, + const struct dsync_mailbox_delete *deletes, + unsigned int count, char hierarchy_sep) +{ + T_BEGIN { + ibc->v.send_mailbox_deletes(ibc, deletes, count, + hierarchy_sep); + } T_END; + return dsync_ibc_send_ret(ibc); +} + +enum dsync_ibc_recv_ret +dsync_ibc_recv_mailbox_deletes(struct dsync_ibc *ibc, + const struct dsync_mailbox_delete **deletes_r, + unsigned int *count_r, char *hierarchy_sep_r) +{ + return ibc->v.recv_mailbox_deletes(ibc, deletes_r, count_r, + hierarchy_sep_r); +} + +enum dsync_ibc_send_ret +dsync_ibc_send_mailbox(struct dsync_ibc *ibc, + const struct dsync_mailbox *dsync_box) +{ + T_BEGIN { + ibc->v.send_mailbox(ibc, dsync_box); + } T_END; + return dsync_ibc_send_ret(ibc); +} + +enum dsync_ibc_recv_ret +dsync_ibc_recv_mailbox(struct dsync_ibc *ibc, + const struct dsync_mailbox **dsync_box_r) +{ + return ibc->v.recv_mailbox(ibc, dsync_box_r); +} + +enum dsync_ibc_send_ret +dsync_ibc_send_change(struct dsync_ibc *ibc, + const struct dsync_mail_change *change) +{ + i_assert(change->uid > 0); + + T_BEGIN { + ibc->v.send_change(ibc, change); + } T_END; + return dsync_ibc_send_ret(ibc); +} + +enum dsync_ibc_recv_ret +dsync_ibc_recv_change(struct dsync_ibc *ibc, + const struct dsync_mail_change **change_r) +{ + return ibc->v.recv_change(ibc, change_r); +} + +enum dsync_ibc_send_ret +dsync_ibc_send_mail_request(struct dsync_ibc *ibc, + const struct dsync_mail_request *request) +{ + i_assert(*request->guid != '\0' || request->uid != 0); + + T_BEGIN { + ibc->v.send_mail_request(ibc, request); + } T_END; + return dsync_ibc_send_ret(ibc); +} + +enum dsync_ibc_recv_ret +dsync_ibc_recv_mail_request(struct dsync_ibc *ibc, + const struct dsync_mail_request **request_r) +{ + return ibc->v.recv_mail_request(ibc, request_r); +} + +enum dsync_ibc_send_ret +dsync_ibc_send_mail(struct dsync_ibc *ibc, const struct dsync_mail *mail) +{ + i_assert(*mail->guid != '\0' || mail->uid != 0); + + T_BEGIN { + ibc->v.send_mail(ibc, mail); + } T_END; + return dsync_ibc_send_ret(ibc); +} + +enum dsync_ibc_recv_ret +dsync_ibc_recv_mail(struct dsync_ibc *ibc, struct dsync_mail **mail_r) +{ + return ibc->v.recv_mail(ibc, mail_r); +} + +void dsync_ibc_flush(struct dsync_ibc *ibc) +{ + if (ibc->v.flush != NULL) + ibc->v.flush(ibc); +} + +bool dsync_ibc_has_failed(struct dsync_ibc *ibc) +{ + return ibc->failed; +} + +bool dsync_ibc_is_send_queue_full(struct dsync_ibc *ibc) +{ + return ibc->v.is_send_queue_full(ibc); +} + +bool dsync_ibc_has_pending_data(struct dsync_ibc *ibc) +{ + return ibc->v.has_pending_data(ibc); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/doveadm/dsync/dsync-ibc.h Fri Sep 07 16:43:44 2012 +0300 @@ -0,0 +1,118 @@ +#ifndef DSYNC_IBC_H +#define DSYNC_IBC_H + +/* dsync inter-brain communicator */ + +#include "ioloop.h" +#include "guid.h" +#include "dsync-brain.h" + +struct dsync_mailbox; +struct dsync_mailbox_state; +struct dsync_mailbox_node; +struct dsync_mailbox_delete; +struct dsync_mail; +struct dsync_mail_change; +struct dsync_mail_request; +struct dsync_ibc_settings; + +enum dsync_ibc_send_ret { + DSYNC_IBC_SEND_RET_OK = 1, + /* send queue is full, stop sending more */ + DSYNC_IBC_SEND_RET_FULL = 0 +}; + +enum dsync_ibc_recv_ret { + DSYNC_IBC_RECV_RET_FINISHED = -1, + /* try again / error (the error handling delayed until io callback) */ + DSYNC_IBC_RECV_RET_TRYAGAIN = 0, + DSYNC_IBC_RECV_RET_OK = 1 +}; + +struct dsync_ibc_settings { + /* if non-NULL, sync only this namespace */ + const char *sync_ns_prefix; + + enum dsync_brain_sync_type sync_type; + bool guid_requests; + bool mails_have_guids; +}; + +void dsync_ibc_init_pipe(struct dsync_ibc **ibc1_r, + struct dsync_ibc **ibc2_r); +struct dsync_ibc * +dsync_ibc_init_stream(int fd_in, int fd_out, const char *name, + const char *temp_path_prefix); +void dsync_ibc_deinit(struct dsync_ibc **ibc); + +/* I/O callback is called whenever new data is available. It's also called on + errors, so check first the error status. */ +void dsync_ibc_set_io_callback(struct dsync_ibc *ibc, + io_callback_t *callback, void *context); + +void dsync_ibc_send_handshake(struct dsync_ibc *ibc, + const struct dsync_ibc_settings *set); +enum dsync_ibc_recv_ret +dsync_ibc_recv_handshake(struct dsync_ibc *ibc, + const struct dsync_ibc_settings **set_r); + +enum dsync_ibc_send_ret ATTR_NOWARN_UNUSED_RESULT +dsync_ibc_send_end_of_list(struct dsync_ibc *ibc); + +enum dsync_ibc_send_ret ATTR_NOWARN_UNUSED_RESULT +dsync_ibc_send_mailbox_state(struct dsync_ibc *ibc, + const struct dsync_mailbox_state *state); +enum dsync_ibc_recv_ret +dsync_ibc_recv_mailbox_state(struct dsync_ibc *ibc, + struct dsync_mailbox_state *state_r); + +enum dsync_ibc_send_ret ATTR_NOWARN_UNUSED_RESULT +dsync_ibc_send_mailbox_tree_node(struct dsync_ibc *ibc, + const char *const *name, + const struct dsync_mailbox_node *node); +enum dsync_ibc_recv_ret +dsync_ibc_recv_mailbox_tree_node(struct dsync_ibc *ibc, + const char *const **name_r, + const struct dsync_mailbox_node **node_r); + +enum dsync_ibc_send_ret ATTR_NOWARN_UNUSED_RESULT +dsync_ibc_send_mailbox_deletes(struct dsync_ibc *ibc, + const struct dsync_mailbox_delete *deletes, + unsigned int count, char hierarchy_sep); +enum dsync_ibc_recv_ret +dsync_ibc_recv_mailbox_deletes(struct dsync_ibc *ibc, + const struct dsync_mailbox_delete **deletes_r, + unsigned int *count_r, char *hierarchy_sep_r); + +enum dsync_ibc_send_ret ATTR_NOWARN_UNUSED_RESULT +dsync_ibc_send_mailbox(struct dsync_ibc *ibc, + const struct dsync_mailbox *dsync_box); +enum dsync_ibc_recv_ret +dsync_ibc_recv_mailbox(struct dsync_ibc *ibc, + const struct dsync_mailbox **dsync_box_r); + +enum dsync_ibc_send_ret ATTR_NOWARN_UNUSED_RESULT +dsync_ibc_send_change(struct dsync_ibc *ibc, + const struct dsync_mail_change *change); +enum dsync_ibc_recv_ret +dsync_ibc_recv_change(struct dsync_ibc *ibc, + const struct dsync_mail_change **change_r); + +enum dsync_ibc_send_ret ATTR_NOWARN_UNUSED_RESULT +dsync_ibc_send_mail_request(struct dsync_ibc *ibc, + const struct dsync_mail_request *request); +enum dsync_ibc_recv_ret +dsync_ibc_recv_mail_request(struct dsync_ibc *ibc, + const struct dsync_mail_request **request_r); + +enum dsync_ibc_send_ret ATTR_NOWARN_UNUSED_RESULT +dsync_ibc_send_mail(struct dsync_ibc *ibc, const struct dsync_mail *mail); +enum dsync_ibc_recv_ret +dsync_ibc_recv_mail(struct dsync_ibc *ibc, struct dsync_mail **mail_r); + +void dsync_ibc_flush(struct dsync_ibc *ibc); +bool dsync_ibc_has_failed(struct dsync_ibc *ibc); +bool dsync_ibc_is_send_queue_full(struct dsync_ibc *ibc); +bool dsync_ibc_has_pending_data(struct dsync_ibc *ibc); + +#endif
--- a/src/doveadm/dsync/dsync-slave-pipe.c Fri Sep 07 16:19:35 2012 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,485 +0,0 @@ -/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "array.h" -#include "istream.h" -#include "dsync-mail.h" -#include "dsync-mailbox.h" -#include "dsync-mailbox-state.h" -#include "dsync-mailbox-tree.h" -#include "dsync-slave-private.h" - -enum item_type { - ITEM_END_OF_LIST, - ITEM_HANDSHAKE, - ITEM_MAILBOX_STATE, - ITEM_MAILBOX_TREE_NODE, - ITEM_MAILBOX_DELETE, - ITEM_MAILBOX, - ITEM_MAIL_CHANGE, - ITEM_MAIL_REQUEST, - ITEM_MAIL -}; - -struct item { - enum item_type type; - pool_t pool; - - union { - struct dsync_slave_settings set; - struct dsync_mailbox_state state; - struct dsync_mailbox_node node; - guid_128_t mailbox_guid; - struct dsync_mailbox dsync_box; - struct dsync_mail_change change; - struct dsync_mail_request request; - struct dsync_mail mail; - struct { - const struct dsync_mailbox_delete *deletes; - unsigned int count; - char hierarchy_sep; - } mailbox_delete; - } u; -}; - -struct dsync_slave_pipe { - struct dsync_slave slave; - - ARRAY(pool_t) pools; - ARRAY(struct item) item_queue; - struct dsync_slave_pipe *remote; - - pool_t pop_pool; - struct item pop_item; -}; - -static pool_t dsync_slave_pipe_get_pool(struct dsync_slave_pipe *pipe) -{ - pool_t *pools, ret; - unsigned int count; - - pools = array_get_modifiable(&pipe->pools, &count); - if (count == 0) - return pool_alloconly_create("pipe item pool", 128); - - ret = pools[count-1]; - array_delete(&pipe->pools, count-1, 1); - p_clear(ret); - return ret; -} - -static struct item * ATTR_NOWARN_UNUSED_RESULT -dsync_slave_pipe_push_item(struct dsync_slave_pipe *pipe, enum item_type type) -{ - struct item *item; - - item = array_append_space(&pipe->item_queue); - item->type = type; - - switch (type) { - case ITEM_END_OF_LIST: - case ITEM_MAILBOX_STATE: - case ITEM_MAILBOX_DELETE: - break; - case ITEM_HANDSHAKE: - case ITEM_MAILBOX: - case ITEM_MAILBOX_TREE_NODE: - case ITEM_MAIL_CHANGE: - case ITEM_MAIL_REQUEST: - case ITEM_MAIL: - item->pool = dsync_slave_pipe_get_pool(pipe); - break; - } - return item; -} - -static struct item * -dsync_slave_pipe_pop_item(struct dsync_slave_pipe *pipe, enum item_type type) -{ - struct item *item; - - if (array_count(&pipe->item_queue) == 0) - return NULL; - - item = array_idx_modifiable(&pipe->item_queue, 0); - i_assert(item->type == type); - pipe->pop_item = *item; - array_delete(&pipe->item_queue, 0, 1); - item = NULL; - - if (pipe->pop_pool != NULL) - pool_unref(&pipe->pop_pool); - pipe->pop_pool = pipe->pop_item.pool; - return &pipe->pop_item; -} - -static bool dsync_slave_pipe_try_pop_eol(struct dsync_slave_pipe *pipe) -{ - const struct item *item; - - if (array_count(&pipe->item_queue) == 0) - return FALSE; - - item = array_idx(&pipe->item_queue, 0); - if (item->type != ITEM_END_OF_LIST) - return FALSE; - - array_delete(&pipe->item_queue, 0, 1); - return TRUE; -} - -static void dsync_slave_pipe_deinit(struct dsync_slave *slave) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - pool_t *poolp; - - if (pipe->remote != NULL) { - i_assert(pipe->remote->remote == pipe); - pipe->remote->remote = NULL; - } - - if (pipe->pop_pool != NULL) - pool_unref(&pipe->pop_pool); - array_foreach_modifiable(&pipe->pools, poolp) - pool_unref(poolp); - array_free(&pipe->pools); - array_free(&pipe->item_queue); - i_free(pipe); -} - -static void -dsync_slave_pipe_send_handshake(struct dsync_slave *slave, - const struct dsync_slave_settings *set) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - item = dsync_slave_pipe_push_item(pipe->remote, ITEM_HANDSHAKE); - item->u.set = *set; - item->u.set.sync_ns_prefix = p_strdup(item->pool, set->sync_ns_prefix); -} - -static enum dsync_slave_recv_ret -dsync_slave_pipe_recv_handshake(struct dsync_slave *slave, - const struct dsync_slave_settings **set_r) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - item = dsync_slave_pipe_pop_item(pipe, ITEM_HANDSHAKE); - if (item == NULL) - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - - *set_r = &item->u.set; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static bool dsync_slave_pipe_is_send_queue_full(struct dsync_slave *slave) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - - return array_count(&pipe->remote->item_queue) > 0; -} - -static bool dsync_slave_pipe_has_pending_data(struct dsync_slave *slave) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - - return array_count(&pipe->item_queue) > 0; -} - -static void -dsync_slave_pipe_send_end_of_list(struct dsync_slave *slave) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - - dsync_slave_pipe_push_item(pipe->remote, ITEM_END_OF_LIST); -} - -static void -dsync_slave_pipe_send_mailbox_state(struct dsync_slave *slave, - const struct dsync_mailbox_state *state) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - item = dsync_slave_pipe_push_item(pipe->remote, ITEM_MAILBOX_STATE); - item->u.state = *state; -} - -static enum dsync_slave_recv_ret -dsync_slave_pipe_recv_mailbox_state(struct dsync_slave *slave, - struct dsync_mailbox_state *state_r) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - if (dsync_slave_pipe_try_pop_eol(pipe)) - return DSYNC_SLAVE_RECV_RET_FINISHED; - - item = dsync_slave_pipe_pop_item(pipe, ITEM_MAILBOX_STATE); - if (item == NULL) - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - - *state_r = item->u.state; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_pipe_send_mailbox_tree_node(struct dsync_slave *slave, - const char *const *name, - const struct dsync_mailbox_node *node) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - item = dsync_slave_pipe_push_item(pipe->remote, ITEM_MAILBOX_TREE_NODE); - - /* a little bit kludgy way to send it */ - item->u.node.name = (void *)p_strarray_dup(item->pool, name); - dsync_mailbox_node_copy_data(&item->u.node, node); -} - -static enum dsync_slave_recv_ret -dsync_slave_pipe_recv_mailbox_tree_node(struct dsync_slave *slave, - const char *const **name_r, - const struct dsync_mailbox_node **node_r) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - if (dsync_slave_pipe_try_pop_eol(pipe)) - return DSYNC_SLAVE_RECV_RET_FINISHED; - - item = dsync_slave_pipe_pop_item(pipe, ITEM_MAILBOX_TREE_NODE); - if (item == NULL) - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - - *name_r = (void *)item->u.node.name; - item->u.node.name = NULL; - - *node_r = &item->u.node; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_pipe_send_mailbox_deletes(struct dsync_slave *slave, - const struct dsync_mailbox_delete *deletes, - unsigned int count, char hierarchy_sep) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - item = dsync_slave_pipe_push_item(pipe->remote, ITEM_MAILBOX_DELETE); - - /* we'll assume that the deletes are permanent. this works for now.. */ - /* a little bit kludgy way to send it */ - item->u.mailbox_delete.deletes = deletes; - item->u.mailbox_delete.count = count; - item->u.mailbox_delete.hierarchy_sep = hierarchy_sep; -} - -static enum dsync_slave_recv_ret -dsync_slave_pipe_recv_mailbox_deletes(struct dsync_slave *slave, - const struct dsync_mailbox_delete **deletes_r, - unsigned int *count_r, - char *hierarchy_sep_r) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - if (dsync_slave_pipe_try_pop_eol(pipe)) - return DSYNC_SLAVE_RECV_RET_FINISHED; - - item = dsync_slave_pipe_pop_item(pipe, ITEM_MAILBOX_DELETE); - if (item == NULL) - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - - *deletes_r = item->u.mailbox_delete.deletes; - *count_r = item->u.mailbox_delete.count; - *hierarchy_sep_r = item->u.mailbox_delete.hierarchy_sep; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_pipe_send_mailbox(struct dsync_slave *slave, - const struct dsync_mailbox *dsync_box) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - const struct mailbox_cache_field *cf; - struct mailbox_cache_field *ncf; - - item = dsync_slave_pipe_push_item(pipe->remote, ITEM_MAILBOX); - item->u.dsync_box = *dsync_box; - p_array_init(&item->u.dsync_box.cache_fields, item->pool, - array_count(&dsync_box->cache_fields)); - array_foreach(&dsync_box->cache_fields, cf) { - ncf = array_append_space(&item->u.dsync_box.cache_fields); - ncf->name = p_strdup(item->pool, cf->name); - ncf->decision = cf->decision; - ncf->last_used = cf->last_used; - } -} - -static enum dsync_slave_recv_ret -dsync_slave_pipe_recv_mailbox(struct dsync_slave *slave, - const struct dsync_mailbox **dsync_box_r) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - if (dsync_slave_pipe_try_pop_eol(pipe)) - return DSYNC_SLAVE_RECV_RET_FINISHED; - - item = dsync_slave_pipe_pop_item(pipe, ITEM_MAILBOX); - if (item == NULL) - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - - *dsync_box_r = &item->u.dsync_box; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_pipe_send_change(struct dsync_slave *slave, - const struct dsync_mail_change *change) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - item = dsync_slave_pipe_push_item(pipe->remote, ITEM_MAIL_CHANGE); - dsync_mail_change_dup(item->pool, change, &item->u.change); -} - -static enum dsync_slave_recv_ret -dsync_slave_pipe_recv_change(struct dsync_slave *slave, - const struct dsync_mail_change **change_r) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - if (dsync_slave_pipe_try_pop_eol(pipe)) - return DSYNC_SLAVE_RECV_RET_FINISHED; - - item = dsync_slave_pipe_pop_item(pipe, ITEM_MAIL_CHANGE); - if (item == NULL) - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - - *change_r = &item->u.change; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_pipe_send_mail_request(struct dsync_slave *slave, - const struct dsync_mail_request *request) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - item = dsync_slave_pipe_push_item(pipe->remote, ITEM_MAIL_REQUEST); - item->u.request.guid = p_strdup(item->pool, request->guid); - item->u.request.uid = request->uid; -} - -static enum dsync_slave_recv_ret -dsync_slave_pipe_recv_mail_request(struct dsync_slave *slave, - const struct dsync_mail_request **request_r) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - if (dsync_slave_pipe_try_pop_eol(pipe)) - return DSYNC_SLAVE_RECV_RET_FINISHED; - - item = dsync_slave_pipe_pop_item(pipe, ITEM_MAIL_REQUEST); - if (item == NULL) - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - - *request_r = &item->u.request; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_pipe_send_mail(struct dsync_slave *slave, - const struct dsync_mail *mail) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - item = dsync_slave_pipe_push_item(pipe->remote, ITEM_MAIL); - item->u.mail.guid = p_strdup(item->pool, mail->guid); - item->u.mail.pop3_uidl = p_strdup(item->pool, mail->pop3_uidl); - item->u.mail.pop3_order = mail->pop3_order; - item->u.mail.received_date = mail->received_date; - if (mail->input != NULL) { - item->u.mail.input = mail->input; - i_stream_ref(mail->input); - } -} - -static enum dsync_slave_recv_ret -dsync_slave_pipe_recv_mail(struct dsync_slave *slave, - struct dsync_mail **mail_r) -{ - struct dsync_slave_pipe *pipe = (struct dsync_slave_pipe *)slave; - struct item *item; - - if (dsync_slave_pipe_try_pop_eol(pipe)) - return DSYNC_SLAVE_RECV_RET_FINISHED; - - item = dsync_slave_pipe_pop_item(pipe, ITEM_MAIL); - if (item == NULL) - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - - *mail_r = &item->u.mail; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static const struct dsync_slave_vfuncs dsync_slave_pipe_vfuncs = { - dsync_slave_pipe_deinit, - dsync_slave_pipe_send_handshake, - dsync_slave_pipe_recv_handshake, - dsync_slave_pipe_send_end_of_list, - dsync_slave_pipe_send_mailbox_state, - dsync_slave_pipe_recv_mailbox_state, - dsync_slave_pipe_send_mailbox_tree_node, - dsync_slave_pipe_recv_mailbox_tree_node, - dsync_slave_pipe_send_mailbox_deletes, - dsync_slave_pipe_recv_mailbox_deletes, - dsync_slave_pipe_send_mailbox, - dsync_slave_pipe_recv_mailbox, - dsync_slave_pipe_send_change, - dsync_slave_pipe_recv_change, - dsync_slave_pipe_send_mail_request, - dsync_slave_pipe_recv_mail_request, - dsync_slave_pipe_send_mail, - dsync_slave_pipe_recv_mail, - NULL, - dsync_slave_pipe_is_send_queue_full, - dsync_slave_pipe_has_pending_data -}; - -static struct dsync_slave_pipe * -dsync_slave_pipe_alloc(void) -{ - struct dsync_slave_pipe *pipe; - - pipe = i_new(struct dsync_slave_pipe, 1); - pipe->slave.v = dsync_slave_pipe_vfuncs; - i_array_init(&pipe->pools, 4); - i_array_init(&pipe->item_queue, 4); - return pipe; -} - -void dsync_slave_init_pipe(struct dsync_slave **slave1_r, - struct dsync_slave **slave2_r) -{ - struct dsync_slave_pipe *pipe1, *pipe2; - - pipe1 = dsync_slave_pipe_alloc(); - pipe2 = dsync_slave_pipe_alloc(); - pipe1->remote = pipe2; - pipe2->remote = pipe1; - *slave1_r = &pipe1->slave; - *slave2_r = &pipe2->slave; -}
--- a/src/doveadm/dsync/dsync-slave-private.h Fri Sep 07 16:19:35 2012 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,78 +0,0 @@ -#ifndef DSYNC_SLAVE_PRIVATE_H -#define DSYNC_SLAVE_PRIVATE_H - -#include "dsync-slave.h" - -struct dsync_slave_vfuncs { - void (*deinit)(struct dsync_slave *slave); - - void (*send_handshake)(struct dsync_slave *slave, - const struct dsync_slave_settings *set); - enum dsync_slave_recv_ret - (*recv_handshake)(struct dsync_slave *slave, - const struct dsync_slave_settings **set_r); - - void (*send_end_of_list)(struct dsync_slave *slave); - - void (*send_mailbox_state)(struct dsync_slave *slave, - const struct dsync_mailbox_state *state); - enum dsync_slave_recv_ret - (*recv_mailbox_state)(struct dsync_slave *slave, - struct dsync_mailbox_state *state_r); - - void (*send_mailbox_tree_node)(struct dsync_slave *slave, - const char *const *name, - const struct dsync_mailbox_node *node); - enum dsync_slave_recv_ret - (*recv_mailbox_tree_node)(struct dsync_slave *slave, - const char *const **name_r, - const struct dsync_mailbox_node **node_r); - - void (*send_mailbox_deletes)(struct dsync_slave *slave, - const struct dsync_mailbox_delete *deletes, - unsigned int count, char hierarchy_sep); - enum dsync_slave_recv_ret - (*recv_mailbox_deletes)(struct dsync_slave *slave, - const struct dsync_mailbox_delete **deletes_r, - unsigned int *count_r, - char *hierarchy_sep_r); - - void (*send_mailbox)(struct dsync_slave *slave, - const struct dsync_mailbox *dsync_box); - enum dsync_slave_recv_ret - (*recv_mailbox)(struct dsync_slave *slave, - const struct dsync_mailbox **dsync_box_r); - - void (*send_change)(struct dsync_slave *slave, - const struct dsync_mail_change *change); - enum dsync_slave_recv_ret - (*recv_change)(struct dsync_slave *slave, - const struct dsync_mail_change **change_r); - - void (*send_mail_request)(struct dsync_slave *slave, - const struct dsync_mail_request *request); - enum dsync_slave_recv_ret - (*recv_mail_request)(struct dsync_slave *slave, - const struct dsync_mail_request **request_r); - - void (*send_mail)(struct dsync_slave *slave, - const struct dsync_mail *mail); - enum dsync_slave_recv_ret - (*recv_mail)(struct dsync_slave *slave, - struct dsync_mail **mail_r); - - void (*flush)(struct dsync_slave *slave); - bool (*is_send_queue_full)(struct dsync_slave *slave); - bool (*has_pending_data)(struct dsync_slave *slave); -}; - -struct dsync_slave { - struct dsync_slave_vfuncs v; - - io_callback_t *io_callback; - void *io_context; - - unsigned int failed:1; -}; - -#endif
--- a/src/doveadm/dsync/dsync-slave-stream.c Fri Sep 07 16:19:35 2012 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1522 +0,0 @@ -/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "array.h" -#include "fd-set-nonblock.h" -#include "safe-mkstemp.h" -#include "ioloop.h" -#include "istream.h" -#include "istream-seekable.h" -#include "istream-dot.h" -#include "ostream.h" -#include "str.h" -#include "strescape.h" -#include "master-service.h" -#include "mail-cache.h" -#include "mail-storage-private.h" -#include "dsync-serializer.h" -#include "dsync-deserializer.h" -#include "dsync-mail.h" -#include "dsync-mailbox.h" -#include "dsync-mailbox-state.h" -#include "dsync-mailbox-tree.h" -#include "dsync-slave-private.h" - -#include <stdlib.h> - -#define DSYNC_SLAVE_STREAM_TIMEOUT_MSECS (60*10*1000) -#define DSYNC_SLAVE_STREAM_OUTBUF_THROTTLE_SIZE (1024*128) - -#define DSYNC_PROTOCOL_VERSION_MAJOR 3 -#define DSYNC_HANDSHAKE_VERSION "VERSION\tdsync\t3\t0\n" - -enum item_type { - ITEM_NONE, - - ITEM_HANDSHAKE, - ITEM_MAILBOX_STATE, - ITEM_MAILBOX_TREE_NODE, - ITEM_MAILBOX_DELETE, - ITEM_MAILBOX, - - ITEM_MAIL_CHANGE, - ITEM_MAIL_REQUEST, - ITEM_MAIL, - - ITEM_MAILBOX_CACHE_FIELD, - - ITEM_END_OF_LIST -}; - -#define END_OF_LIST_LINE "." -static const struct { - /* full human readable name of the item */ - const char *name; - /* unique character identifying the item */ - char chr; - const char *required_keys; - const char *optional_keys; -} items[ITEM_END_OF_LIST+1] = { - { NULL, '\0', NULL, NULL }, - { .name = "handshake", - .chr = 'H', - .optional_keys = "sync_ns_prefix sync_type " - "guid_requests mails_have_guids" - }, - { .name = "mailbox_state", - .chr = 'S', - .required_keys = "mailbox_guid last_uidvalidity last_common_uid " - "last_common_modseq" - }, - { .name = "mailbox_tree_node", - .chr = 'N', - .required_keys = "name existence", - .optional_keys = "mailbox_guid uid_validity " - "last_renamed_or_created subscribed last_subscription_change" - }, - { .name = "mailbox_delete", - .chr = 'D', - .required_keys = "hierarchy_sep", - .optional_keys = "mailboxes dirs" - }, - { .name = "mailbox", - .chr = 'B', - .required_keys = "mailbox_guid uid_validity uid_next " - "messages_count first_recent_uid highest_modseq", - .optional_keys = "cache_fields" - }, - { .name = "mail_change", - .chr = 'C', - .required_keys = "type uid", - .optional_keys = "guid hdr_hash modseq save_timestamp " - "add_flags remove_flags final_flags " - "keywords_reset keyword_changes" - }, - { .name = "mail_request", - .chr = 'R', - .optional_keys = "guid uid" - }, - { .name = "mail", - .chr = 'M', - .optional_keys = "guid uid pop3_uidl pop3_order received_date stream" - }, - { .name = "mailbox_cache_field", - .chr = 'c', - .required_keys = "name decision", - .optional_keys = "last_used" - }, - - { "end_of_list", '\0', NULL, NULL } -}; - -struct dsync_slave_stream { - struct dsync_slave slave; - - char *name, *temp_path_prefix; - int fd_in, fd_out; - struct istream *input; - struct ostream *output; - struct io *io; - struct timeout *to; - - struct dsync_serializer *serializers[ITEM_END_OF_LIST]; - struct dsync_deserializer *deserializers[ITEM_END_OF_LIST]; - - pool_t ret_pool; - struct dsync_deserializer_decoder *cur_decoder; - - struct istream *mail_output, *mail_input; - struct dsync_mail *cur_mail; - char mail_output_last; - - unsigned int version_received:1; - unsigned int handshake_received:1; - unsigned int has_pending_data:1; -}; - -static void dsync_slave_stream_stop(struct dsync_slave_stream *slave) -{ - i_stream_close(slave->input); - o_stream_close(slave->output); - io_loop_stop(current_ioloop); -} - -static int dsync_slave_stream_read_mail_stream(struct dsync_slave_stream *slave) -{ - if (i_stream_read(slave->mail_input) < 0) { - if (slave->mail_input->stream_errno != 0) { - errno = slave->mail_input->stream_errno; - i_error("dsync(%s): read() failed: %m", slave->name); - dsync_slave_stream_stop(slave); - return -1; - } - /* finished reading the mail stream */ - i_assert(slave->mail_input->eof); - i_stream_seek(slave->mail_input, 0); - slave->mail_input = NULL; - return 1; - } - i_stream_skip(slave->mail_input, - i_stream_get_data_size(slave->mail_input)); - return 0; -} - -static void dsync_slave_stream_input(struct dsync_slave_stream *slave) -{ - if (slave->mail_input != NULL) { - if (dsync_slave_stream_read_mail_stream(slave) == 0) - return; - } - slave->slave.io_callback(slave->slave.io_context); -} - -static int dsync_slave_stream_send_mail_stream(struct dsync_slave_stream *slave) -{ - const unsigned char *data; - unsigned char add; - size_t i, size; - int ret; - - while ((ret = i_stream_read_data(slave->mail_output, - &data, &size, 0)) > 0) { - add = '\0'; - for (i = 0; i < size; i++) { - if (data[i] == '\n') { - if ((i == 0 && slave->mail_output_last != '\r') || - (i > 0 && data[i-1] != '\r')) { - /* missing CR */ - add = '\r'; - break; - } - } else if (data[i] == '.' && - ((i == 0 && slave->mail_output_last == '\n') || - (i > 0 && data[i-1] == '\n'))) { - /* escape the dot */ - add = '.'; - break; - } - } - - if (i > 0) { - o_stream_nsend(slave->output, data, i); - slave->mail_output_last = data[i-1]; - i_stream_skip(slave->mail_output, i); - } - - if (o_stream_get_buffer_used_size(slave->output) >= 4096) { - if ((ret = o_stream_flush(slave->output)) < 0) { - dsync_slave_stream_stop(slave); - return -1; - } - if (ret == 0) { - /* continue later */ - o_stream_set_flush_pending(slave->output, TRUE); - return 0; - } - } - - if (add != '\0') { - o_stream_nsend(slave->output, &add, 1); - slave->mail_output_last = add; - } - } - i_assert(ret == -1); - - if (slave->mail_output->stream_errno != 0) { - i_error("dsync(%s): read(%s) failed: %m", - slave->name, i_stream_get_name(slave->mail_output)); - dsync_slave_stream_stop(slave); - return -1; - } - - /* finished sending the stream */ - o_stream_nsend_str(slave->output, "\r\n.\r\n"); - i_stream_unref(&slave->mail_output); - return 1; -} - -static int dsync_slave_stream_output(struct dsync_slave_stream *slave) -{ - struct ostream *output = slave->output; - int ret; - - if ((ret = o_stream_flush(output)) < 0) - ret = 1; - else if (slave->mail_output != NULL) { - if (dsync_slave_stream_send_mail_stream(slave) < 0) - ret = 1; - } - timeout_reset(slave->to); - - if (!dsync_slave_is_send_queue_full(&slave->slave)) - slave->slave.io_callback(slave->slave.io_context); - return ret; -} - -static void dsync_slave_stream_timeout(struct dsync_slave_stream *slave) -{ - i_error("dsync(%s): I/O has stalled, no activity for %u seconds", - slave->name, DSYNC_SLAVE_STREAM_TIMEOUT_MSECS/1000); - dsync_slave_stream_stop(slave); -} - -static void dsync_slave_stream_init(struct dsync_slave_stream *slave) -{ - unsigned int i; - - fd_set_nonblock(slave->fd_in, TRUE); - fd_set_nonblock(slave->fd_out, TRUE); - - slave->input = i_stream_create_fd(slave->fd_in, (size_t)-1, FALSE); - slave->output = o_stream_create_fd(slave->fd_out, (size_t)-1, FALSE); - slave->io = io_add(slave->fd_in, IO_READ, - dsync_slave_stream_input, slave); - o_stream_set_no_error_handling(slave->output, TRUE); - o_stream_set_flush_callback(slave->output, dsync_slave_stream_output, - slave); - slave->to = timeout_add(DSYNC_SLAVE_STREAM_TIMEOUT_MSECS, - dsync_slave_stream_timeout, slave); - o_stream_cork(slave->output); - o_stream_nsend_str(slave->output, DSYNC_HANDSHAKE_VERSION); - - /* initialize serializers and send their headers to remote */ - for (i = 1; i < ITEM_END_OF_LIST; i++) T_BEGIN { - const char *keys; - - keys = items[i].required_keys == NULL ? items[i].optional_keys : - t_strconcat(items[i].required_keys, " ", - items[i].optional_keys, NULL); - if (keys != NULL) { - i_assert(items[i].chr != '\0'); - - slave->serializers[i] = - dsync_serializer_init(t_strsplit_spaces(keys, " ")); - o_stream_nsend(slave->output, &items[i].chr, 1); - o_stream_nsend_str(slave->output, - dsync_serializer_encode_header_line(slave->serializers[i])); - } - } T_END; - o_stream_nsend_str(slave->output, ".\n"); - - dsync_slave_flush(&slave->slave); -} - -static void dsync_slave_stream_deinit(struct dsync_slave *_slave) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - - if (slave->cur_decoder != NULL) - dsync_deserializer_decode_finish(&slave->cur_decoder); - if (slave->mail_output != NULL) - i_stream_unref(&slave->mail_output); - - timeout_remove(&slave->to); - if (slave->io != NULL) - io_remove(&slave->io); - i_stream_destroy(&slave->input); - o_stream_destroy(&slave->output); - if (close(slave->fd_in) < 0) - i_error("close(%s) failed: %m", slave->name); - if (slave->fd_in != slave->fd_out) { - if (close(slave->fd_out) < 0) - i_error("close(%s) failed: %m", slave->name); - } - pool_unref(&slave->ret_pool); - i_free(slave->temp_path_prefix); - i_free(slave->name); - i_free(slave); -} - -static int dsync_slave_stream_next_line(struct dsync_slave_stream *slave, - const char **line_r) -{ - const char *line; - - line = i_stream_next_line(slave->input); - if (line != NULL) { - *line_r = line; - return 1; - } - - /* try reading some */ - switch (i_stream_read(slave->input)) { - case -1: - if (slave->input->stream_errno != 0) { - errno = slave->input->stream_errno; - i_error("read(%s) failed: %m", slave->name); - } else { - i_assert(slave->input->eof); - i_error("read(%s) failed: EOF", slave->name); - } - dsync_slave_stream_stop(slave); - return -1; - case 0: - return 0; - } - *line_r = i_stream_next_line(slave->input); - if (*line_r == NULL) { - slave->has_pending_data = FALSE; - return 0; - } - slave->has_pending_data = TRUE; - return 1; -} - -static void ATTR_FORMAT(3, 4) ATTR_NULL(2) -dsync_slave_input_error(struct dsync_slave_stream *slave, - struct dsync_deserializer_decoder *decoder, - const char *fmt, ...) -{ - va_list args; - const char *error; - - va_start(args, fmt); - error = t_strdup_vprintf(fmt, args); - if (decoder == NULL) - i_error("dsync(%s): %s", slave->name, error); - else { - i_error("dsync(%s): %s: %s", slave->name, - dsync_deserializer_decoder_get_name(decoder), error); - } - va_end(args); - - dsync_slave_stream_stop(slave); -} - -static void -dsync_slave_stream_send_string(struct dsync_slave_stream *slave, - const string_t *str) -{ - i_assert(slave->mail_output == NULL); - o_stream_nsend(slave->output, str_data(str), str_len(str)); -} - -static int -dsync_slave_check_missing_deserializers(struct dsync_slave_stream *slave) -{ - unsigned int i; - int ret = 0; - - for (i = 1; i < ITEM_END_OF_LIST; i++) { - if (slave->deserializers[i] == NULL && - (items[i].required_keys != NULL || - items[i].optional_keys != NULL)) { - dsync_slave_input_error(slave, NULL, - "Remote didn't handshake deserializer for %s", - items[i].name); - ret = -1; - } - } - return ret; -} - -static bool -dsync_slave_stream_handshake(struct dsync_slave_stream *slave, const char *line) -{ - enum item_type item = ITEM_NONE; - const char *const *required_keys, *error; - unsigned int i; - - if (slave->handshake_received) - return TRUE; - - if (!slave->version_received) { - if (!version_string_verify(line, "dsync", - DSYNC_PROTOCOL_VERSION_MAJOR)) { - dsync_slave_input_error(slave, NULL, - "Remote dsync doesn't use compatible protocol"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - slave->version_received = TRUE; - return FALSE; - } - - if (strcmp(line, END_OF_LIST_LINE) == 0) { - /* finished handshaking */ - if (dsync_slave_check_missing_deserializers(slave) < 0) - return FALSE; - slave->handshake_received = TRUE; - return FALSE; - } - - for (i = 1; i < ITEM_END_OF_LIST; i++) { - if (items[i].chr == line[0]) { - item = i; - break; - } - } - if (item == ITEM_NONE) { - /* unknown deserializer, ignore */ - return FALSE; - } - - required_keys = items[item].required_keys == NULL ? NULL : - t_strsplit(items[item].required_keys, " "); - if (dsync_deserializer_init(items[item].name, - required_keys, line + 1, - &slave->deserializers[item], &error) < 0) { - dsync_slave_input_error(slave, NULL, - "Remote sent invalid handshake for %s: %s", - items[item].name, error); - } - return FALSE; -} - -static enum dsync_slave_recv_ret -dsync_slave_stream_input_next(struct dsync_slave_stream *slave, enum item_type item, - struct dsync_deserializer_decoder **decoder_r) -{ - enum item_type line_item = ITEM_NONE; - const char *line, *error; - unsigned int i; - - i_assert(slave->mail_input == NULL); - - timeout_reset(slave->to); - - do { - if (dsync_slave_stream_next_line(slave, &line) <= 0) - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } while (!dsync_slave_stream_handshake(slave, line)); - - if (strcmp(line, END_OF_LIST_LINE) == 0) { - /* end of this list */ - return DSYNC_SLAVE_RECV_RET_FINISHED; - } - for (i = 1; i < ITEM_END_OF_LIST; i++) { - if (*line == items[i].chr) { - line_item = i; - break; - } - } - if (line_item != item) { - dsync_slave_input_error(slave, NULL, - "Received unexpected input %c != %c", - *line, items[item].chr); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - - if (slave->cur_decoder != NULL) - dsync_deserializer_decode_finish(&slave->cur_decoder); - if (dsync_deserializer_decode_begin(slave->deserializers[item], - line+1, &slave->cur_decoder, - &error) < 0) { - dsync_slave_input_error(slave, NULL, "Invalid input to %s: %s", - items[item].name, error); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - *decoder_r = slave->cur_decoder; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_stream_send_handshake(struct dsync_slave *_slave, - const struct dsync_slave_settings *set) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_serializer_encoder *encoder; - string_t *str = t_str_new(128); - char sync_type[2]; - - str_append_c(str, items[ITEM_HANDSHAKE].chr); - encoder = dsync_serializer_encode_begin(slave->serializers[ITEM_HANDSHAKE]); - if (set->sync_ns_prefix != NULL) { - dsync_serializer_encode_add(encoder, "sync_ns_prefix", - set->sync_ns_prefix); - } - - sync_type[0] = sync_type[1] = '\0'; - switch (set->sync_type) { - case DSYNC_BRAIN_SYNC_TYPE_UNKNOWN: - break; - case DSYNC_BRAIN_SYNC_TYPE_FULL: - sync_type[0] = 'f'; - break; - case DSYNC_BRAIN_SYNC_TYPE_CHANGED: - sync_type[0] = 'c'; - break; - case DSYNC_BRAIN_SYNC_TYPE_STATE: - sync_type[0] = 's'; - break; - } - i_assert(sync_type[0] != '\0'); - dsync_serializer_encode_add(encoder, "sync_type", sync_type); - if (set->guid_requests) - dsync_serializer_encode_add(encoder, "guid_requests", ""); - if (set->mails_have_guids) - dsync_serializer_encode_add(encoder, "mails_have_guids", ""); - - dsync_serializer_encode_finish(&encoder, str); - dsync_slave_stream_send_string(slave, str); -} - -static enum dsync_slave_recv_ret -dsync_slave_stream_recv_handshake(struct dsync_slave *_slave, - const struct dsync_slave_settings **set_r) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_deserializer_decoder *decoder; - struct dsync_slave_settings *set; - const char *value; - pool_t pool = slave->ret_pool; - enum dsync_slave_recv_ret ret; - - ret = dsync_slave_stream_input_next(slave, ITEM_HANDSHAKE, &decoder); - if (ret != DSYNC_SLAVE_RECV_RET_OK) { - if (ret != DSYNC_SLAVE_RECV_RET_TRYAGAIN) { - i_error("dsync(%s): Unexpected input in handshake", - slave->name); - dsync_slave_stream_stop(slave); - } - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - - p_clear(pool); - set = p_new(pool, struct dsync_slave_settings, 1); - - if (dsync_deserializer_decode_try(decoder, "sync_ns_prefix", &value)) - set->sync_ns_prefix = p_strdup(pool, value); - if (dsync_deserializer_decode_try(decoder, "sync_type", &value)) { - switch (value[0]) { - case 'f': - set->sync_type = DSYNC_BRAIN_SYNC_TYPE_FULL; - break; - case 'c': - set->sync_type = DSYNC_BRAIN_SYNC_TYPE_CHANGED; - break; - case 's': - set->sync_type = DSYNC_BRAIN_SYNC_TYPE_STATE; - break; - default: - dsync_slave_input_error(slave, decoder, - "Unknown sync_type: %s", value); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - } - if (dsync_deserializer_decode_try(decoder, "guid_requests", &value)) - set->guid_requests = TRUE; - if (dsync_deserializer_decode_try(decoder, "mails_have_guids", &value)) - set->mails_have_guids = TRUE; - - *set_r = set; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_stream_send_end_of_list(struct dsync_slave *_slave) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - - i_assert(slave->mail_output == NULL); - - o_stream_nsend_str(slave->output, END_OF_LIST_LINE"\n"); -} - -static void -dsync_slave_stream_send_mailbox_state(struct dsync_slave *_slave, - const struct dsync_mailbox_state *state) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_serializer_encoder *encoder; - string_t *str = t_str_new(128); - - str_append_c(str, items[ITEM_MAILBOX_STATE].chr); - encoder = dsync_serializer_encode_begin(slave->serializers[ITEM_MAILBOX_STATE]); - dsync_serializer_encode_add(encoder, "mailbox_guid", - guid_128_to_string(state->mailbox_guid)); - dsync_serializer_encode_add(encoder, "last_uidvalidity", - dec2str(state->last_uidvalidity)); - dsync_serializer_encode_add(encoder, "last_common_uid", - dec2str(state->last_common_uid)); - dsync_serializer_encode_add(encoder, "last_common_modseq", - dec2str(state->last_common_modseq)); - - dsync_serializer_encode_finish(&encoder, str); - dsync_slave_stream_send_string(slave, str); -} - -static enum dsync_slave_recv_ret -dsync_slave_stream_recv_mailbox_state(struct dsync_slave *_slave, - struct dsync_mailbox_state *state_r) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_deserializer_decoder *decoder; - const char *value; - enum dsync_slave_recv_ret ret; - - memset(state_r, 0, sizeof(*state_r)); - - ret = dsync_slave_stream_input_next(slave, ITEM_MAILBOX_STATE, &decoder); - if (ret != DSYNC_SLAVE_RECV_RET_OK) - return ret; - - value = dsync_deserializer_decode_get(decoder, "mailbox_guid"); - if (guid_128_from_string(value, state_r->mailbox_guid) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid mailbox_guid"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - value = dsync_deserializer_decode_get(decoder, "last_uidvalidity"); - if (str_to_uint32(value, &state_r->last_uidvalidity) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid last_uidvalidity"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - value = dsync_deserializer_decode_get(decoder, "last_common_uid"); - if (str_to_uint32(value, &state_r->last_uidvalidity) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid last_common_uid"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - value = dsync_deserializer_decode_get(decoder, "last_common_modseq"); - if (str_to_uint32(value, &state_r->last_uidvalidity) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid last_common_modseq"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_stream_send_mailbox_tree_node(struct dsync_slave *_slave, - const char *const *name, - const struct dsync_mailbox_node *node) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_serializer_encoder *encoder; - string_t *str, *namestr; - - i_assert(*name != NULL); - - str = t_str_new(128); - str_append_c(str, items[ITEM_MAILBOX_TREE_NODE].chr); - - /* convert all hierarchy separators to tabs. mailbox names really - aren't supposed to have any tabs, but escape them anyway if there - are. */ - namestr = t_str_new(128); - for (; *name != NULL; name++) { - str_tabescape_write(namestr, *name); - str_append_c(namestr, '\t'); - } - str_truncate(namestr, str_len(namestr)-1); - - encoder = dsync_serializer_encode_begin(slave->serializers[ITEM_MAILBOX_TREE_NODE]); - dsync_serializer_encode_add(encoder, "name", str_c(namestr)); - switch (node->existence) { - case DSYNC_MAILBOX_NODE_NONEXISTENT: - dsync_serializer_encode_add(encoder, "existence", "n"); - break; - case DSYNC_MAILBOX_NODE_EXISTS: - dsync_serializer_encode_add(encoder, "existence", "y"); - break; - case DSYNC_MAILBOX_NODE_DELETED: - dsync_serializer_encode_add(encoder, "existence", "d"); - break; - } - - if (!guid_128_is_empty(node->mailbox_guid)) { - dsync_serializer_encode_add(encoder, "mailbox_guid", - guid_128_to_string(node->mailbox_guid)); - } - if (node->uid_validity != 0) { - dsync_serializer_encode_add(encoder, "uid_validity", - dec2str(node->uid_validity)); - } - if (node->last_renamed_or_created != 0) { - dsync_serializer_encode_add(encoder, "last_renamed_or_created", - dec2str(node->last_renamed_or_created)); - } - if (node->last_subscription_change != 0) { - dsync_serializer_encode_add(encoder, "last_subscription_change", - dec2str(node->last_subscription_change)); - } - if (node->subscribed) - dsync_serializer_encode_add(encoder, "subscribed", ""); - dsync_serializer_encode_finish(&encoder, str); - dsync_slave_stream_send_string(slave, str); -} - -static enum dsync_slave_recv_ret -dsync_slave_stream_recv_mailbox_tree_node(struct dsync_slave *_slave, - const char *const **name_r, - const struct dsync_mailbox_node **node_r) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_deserializer_decoder *decoder; - struct dsync_mailbox_node *node; - const char *value; - enum dsync_slave_recv_ret ret; - - ret = dsync_slave_stream_input_next(slave, ITEM_MAILBOX_TREE_NODE, &decoder); - if (ret != DSYNC_SLAVE_RECV_RET_OK) - return ret; - - p_clear(slave->ret_pool); - node = p_new(slave->ret_pool, struct dsync_mailbox_node, 1); - - value = dsync_deserializer_decode_get(decoder, "name"); - if (*value == '\0') { - dsync_slave_input_error(slave, decoder, "Empty name"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - *name_r = (void *)p_strsplit_tabescaped(slave->ret_pool, value); - - value = dsync_deserializer_decode_get(decoder, "existence"); - switch (*value) { - case 'n': - node->existence = DSYNC_MAILBOX_NODE_NONEXISTENT; - break; - case 'y': - node->existence = DSYNC_MAILBOX_NODE_EXISTS; - break; - case 'd': - node->existence = DSYNC_MAILBOX_NODE_DELETED; - break; - } - - if (dsync_deserializer_decode_try(decoder, "mailbox_guid", &value) && - guid_128_from_string(value, node->mailbox_guid) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid mailbox_guid"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - if (dsync_deserializer_decode_try(decoder, "uid_validity", &value) && - str_to_uint32(value, &node->uid_validity) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid uid_validity"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - if (dsync_deserializer_decode_try(decoder, "last_renamed_or_created", &value) && - str_to_time(value, &node->last_renamed_or_created) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid last_renamed_or_created"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - if (dsync_deserializer_decode_try(decoder, "last_subscription_change", &value) && - str_to_time(value, &node->last_subscription_change) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid last_subscription_change"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - if (dsync_deserializer_decode_try(decoder, "subscribed", &value)) - node->subscribed = TRUE; - - *node_r = node; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_stream_send_mailbox_deletes(struct dsync_slave *_slave, - const struct dsync_mailbox_delete *deletes, - unsigned int count, char hierarchy_sep) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_serializer_encoder *encoder; - string_t *str, *substr; - char sep[2]; - unsigned int i; - - str = t_str_new(128); - str_append_c(str, items[ITEM_MAILBOX_DELETE].chr); - - encoder = dsync_serializer_encode_begin(slave->serializers[ITEM_MAILBOX_DELETE]); - sep[0] = hierarchy_sep; sep[1] = '\0'; - dsync_serializer_encode_add(encoder, "hierarchy_sep", sep); - - substr = t_str_new(128); - for (i = 0; i < count; i++) { - if (deletes[i].delete_mailbox) { - str_append(substr, guid_128_to_string(deletes[i].guid)); - str_printfa(substr, " %ld ", (long)deletes[i].timestamp); - } - } - if (str_len(substr) > 0) { - str_truncate(substr, str_len(substr)-1); - dsync_serializer_encode_add(encoder, "mailboxes", - str_c(substr)); - } - - str_truncate(substr, 0); - for (i = 0; i < count; i++) { - if (!deletes[i].delete_mailbox) { - str_append(substr, guid_128_to_string(deletes[i].guid)); - str_printfa(substr, " %ld ", (long)deletes[i].timestamp); - } - } - if (str_len(substr) > 0) { - str_truncate(substr, str_len(substr)-1); - dsync_serializer_encode_add(encoder, "dirs", str_c(substr)); - } - dsync_serializer_encode_finish(&encoder, str); - dsync_slave_stream_send_string(slave, str); -} - -ARRAY_DEFINE_TYPE(dsync_mailbox_delete, struct dsync_mailbox_delete); -static int -decode_mailbox_deletes(ARRAY_TYPE(dsync_mailbox_delete) *deletes, - const char *value, bool delete_mailbox) -{ - struct dsync_mailbox_delete *del; - const char *const *tmp; - unsigned int i; - - tmp = t_strsplit(value, " "); - for (i = 0; tmp[i] != NULL; i += 2) { - del = array_append_space(deletes); - del->delete_mailbox = delete_mailbox; - if (guid_128_from_string(tmp[i], del->guid) < 0) - return -1; - if (tmp[i+1] == NULL || - str_to_time(tmp[i+1], &del->timestamp) < 0) - return -1; - } - return 0; -} - -static enum dsync_slave_recv_ret -dsync_slave_stream_recv_mailbox_deletes(struct dsync_slave *_slave, - const struct dsync_mailbox_delete **deletes_r, - unsigned int *count_r, char *hierarchy_sep_r) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_deserializer_decoder *decoder; - ARRAY_TYPE(dsync_mailbox_delete) deletes; - const char *value; - enum dsync_slave_recv_ret ret; - - ret = dsync_slave_stream_input_next(slave, ITEM_MAILBOX_DELETE, &decoder); - if (ret != DSYNC_SLAVE_RECV_RET_OK) - return ret; - - p_clear(slave->ret_pool); - p_array_init(&deletes, slave->ret_pool, 16); - - value = dsync_deserializer_decode_get(decoder, "hierarchy_sep"); - if (strlen(value) != 1) { - dsync_slave_input_error(slave, decoder, "Invalid hierarchy_sep"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - *hierarchy_sep_r = value[0]; - - if (dsync_deserializer_decode_try(decoder, "mailboxes", &value) && - decode_mailbox_deletes(&deletes, value, TRUE) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid mailboxes"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - if (dsync_deserializer_decode_try(decoder, "dirs", &value) && - decode_mailbox_deletes(&deletes, value, FALSE) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid dirs"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - *deletes_r = array_get(&deletes, count_r); - return DSYNC_SLAVE_RECV_RET_OK; -} - -static const char * -get_cache_fields(struct dsync_slave_stream *slave, - const struct dsync_mailbox *dsync_box) -{ - struct dsync_serializer_encoder *encoder; - string_t *str; - const struct mailbox_cache_field *cache_fields; - unsigned int i, count; - char decision[3]; - - cache_fields = array_get(&dsync_box->cache_fields, &count); - if (count == 0) - return ""; - - str = t_str_new(128); - for (i = 0; i < count; i++) { - const struct mailbox_cache_field *field = &cache_fields[i]; - - encoder = dsync_serializer_encode_begin(slave->serializers[ITEM_MAILBOX_CACHE_FIELD]); - dsync_serializer_encode_add(encoder, "name", field->name); - - memset(decision, 0, sizeof(decision)); - switch (field->decision & ~MAIL_CACHE_DECISION_FORCED) { - case MAIL_CACHE_DECISION_NO: - decision[0] = 'n'; - break; - case MAIL_CACHE_DECISION_TEMP: - decision[0] = 't'; - break; - case MAIL_CACHE_DECISION_YES: - decision[0] = 'y'; - break; - } - i_assert(decision[0] != '\0'); - if ((field->decision & MAIL_CACHE_DECISION_FORCED) != 0) - decision[1] = 'F'; - dsync_serializer_encode_add(encoder, "decision", decision); - if (field->last_used != 0) { - dsync_serializer_encode_add(encoder, "last_used", - dec2str(field->last_used)); - } - dsync_serializer_encode_finish(&encoder, str); - } - if (i > 0) { - /* remove the trailing LF */ - str_truncate(str, str_len(str)-1); - } - return str_c(str); -} - -static void -dsync_slave_stream_send_mailbox(struct dsync_slave *_slave, - const struct dsync_mailbox *dsync_box) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_serializer_encoder *encoder; - string_t *str = t_str_new(128); - const char *value; - - str_append_c(str, items[ITEM_MAILBOX].chr); - encoder = dsync_serializer_encode_begin(slave->serializers[ITEM_MAILBOX]); - dsync_serializer_encode_add(encoder, "mailbox_guid", - guid_128_to_string(dsync_box->mailbox_guid)); - - if (dsync_box->mailbox_lost) - dsync_serializer_encode_add(encoder, "mailbox_lost", ""); - dsync_serializer_encode_add(encoder, "uid_validity", - dec2str(dsync_box->uid_validity)); - dsync_serializer_encode_add(encoder, "uid_next", - dec2str(dsync_box->uid_next)); - dsync_serializer_encode_add(encoder, "messages_count", - dec2str(dsync_box->messages_count)); - dsync_serializer_encode_add(encoder, "first_recent_uid", - dec2str(dsync_box->first_recent_uid)); - dsync_serializer_encode_add(encoder, "highest_modseq", - dec2str(dsync_box->highest_modseq)); - - value = get_cache_fields(slave, dsync_box); - if (value != NULL) - dsync_serializer_encode_add(encoder, "cache_fields", value); - - dsync_serializer_encode_finish(&encoder, str); - dsync_slave_stream_send_string(slave, str); -} - -static int -parse_cache_field(struct dsync_slave_stream *slave, struct dsync_mailbox *box, - const char *value) -{ - struct dsync_deserializer_decoder *decoder; - struct mailbox_cache_field field; - const char *error; - int ret = 0; - - if (dsync_deserializer_decode_begin(slave->deserializers[ITEM_MAILBOX_CACHE_FIELD], - value, &decoder, &error) < 0) { - dsync_slave_input_error(slave, NULL, - "cache_field: Invalid input: %s", error); - return -1; - } - - memset(&field, 0, sizeof(field)); - value = dsync_deserializer_decode_get(decoder, "name"); - field.name = p_strdup(slave->ret_pool, value); - - value = dsync_deserializer_decode_get(decoder, "decision"); - switch (*value) { - case 'n': - field.decision = MAIL_CACHE_DECISION_NO; - break; - case 't': - field.decision = MAIL_CACHE_DECISION_TEMP; - break; - case 'y': - field.decision = MAIL_CACHE_DECISION_YES; - break; - default: - dsync_slave_input_error(slave, decoder, "Invalid decision: %s", - value); - ret = -1; - break; - } - if (value[1] == 'F') - field.decision |= MAIL_CACHE_DECISION_FORCED; - - if (dsync_deserializer_decode_try(decoder, "last_used", &value) && - str_to_time(value, &field.last_used) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid last_used"); - ret = -1; - } - array_append(&box->cache_fields, &field, 1); - - dsync_deserializer_decode_finish(&decoder); - return ret; -} - -static enum dsync_slave_recv_ret -dsync_slave_stream_recv_mailbox(struct dsync_slave *_slave, - const struct dsync_mailbox **dsync_box_r) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - pool_t pool = slave->ret_pool; - struct dsync_deserializer_decoder *decoder; - struct dsync_mailbox *box; - const char *value; - enum dsync_slave_recv_ret ret; - - p_clear(pool); - box = p_new(pool, struct dsync_mailbox, 1); - - ret = dsync_slave_stream_input_next(slave, ITEM_MAILBOX, &decoder); - if (ret != DSYNC_SLAVE_RECV_RET_OK) - return ret; - - value = dsync_deserializer_decode_get(decoder, "mailbox_guid"); - if (guid_128_from_string(value, box->mailbox_guid) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid mailbox_guid"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - - if (dsync_deserializer_decode_try(decoder, "mailbox_lost", &value)) - box->mailbox_lost = TRUE; - value = dsync_deserializer_decode_get(decoder, "uid_validity"); - if (str_to_uint32(value, &box->uid_validity) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid uid_validity"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - value = dsync_deserializer_decode_get(decoder, "uid_next"); - if (str_to_uint32(value, &box->uid_next) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid uid_next"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - value = dsync_deserializer_decode_get(decoder, "messages_count"); - if (str_to_uint32(value, &box->messages_count) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid messages_count"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - value = dsync_deserializer_decode_get(decoder, "first_recent_uid"); - if (str_to_uint32(value, &box->first_recent_uid) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid first_recent_uid"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - value = dsync_deserializer_decode_get(decoder, "highest_modseq"); - if (str_to_uint64(value, &box->highest_modseq) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid highest_modseq"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - - p_array_init(&box->cache_fields, pool, 32); - if (dsync_deserializer_decode_try(decoder, "cache_fields", &value)) { - const char *const *fields = t_strsplit(value, "\n"); - for (; *fields != NULL; fields++) { - if (parse_cache_field(slave, box, *fields) < 0) - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - } - - *dsync_box_r = box; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_stream_send_change(struct dsync_slave *_slave, - const struct dsync_mail_change *change) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_serializer_encoder *encoder; - string_t *str = t_str_new(128); - char type[2]; - - str_append_c(str, items[ITEM_MAIL_CHANGE].chr); - encoder = dsync_serializer_encode_begin(slave->serializers[ITEM_MAIL_CHANGE]); - - type[0] = type[1] = '\0'; - switch (change->type) { - case DSYNC_MAIL_CHANGE_TYPE_SAVE: - type[0] = 's'; - break; - case DSYNC_MAIL_CHANGE_TYPE_EXPUNGE: - type[0] = 'e'; - break; - case DSYNC_MAIL_CHANGE_TYPE_FLAG_CHANGE: - type[0] = 'f'; - break; - } - i_assert(type[0] != '\0'); - dsync_serializer_encode_add(encoder, "type", type); - dsync_serializer_encode_add(encoder, "uid", dec2str(change->uid)); - if (change->guid != NULL) - dsync_serializer_encode_add(encoder, "guid", change->guid); - if (change->hdr_hash != NULL) { - dsync_serializer_encode_add(encoder, "hdr_hash", - change->hdr_hash); - } - if (change->modseq != 0) { - dsync_serializer_encode_add(encoder, "modseq", - dec2str(change->modseq)); - } - if (change->save_timestamp != 0) { - dsync_serializer_encode_add(encoder, "save_timestamp", - dec2str(change->save_timestamp)); - } - if (change->add_flags != 0) { - dsync_serializer_encode_add(encoder, "add_flags", - t_strdup_printf("%x", change->add_flags)); - } - if (change->remove_flags != 0) { - dsync_serializer_encode_add(encoder, "remove_flags", - t_strdup_printf("%x", change->remove_flags)); - } - if (change->final_flags != 0) { - dsync_serializer_encode_add(encoder, "final_flags", - t_strdup_printf("%x", change->final_flags)); - } - if (change->keywords_reset) - dsync_serializer_encode_add(encoder, "keywords_reset", ""); - - if (array_is_created(&change->keyword_changes) && - array_count(&change->keyword_changes) > 0) { - string_t *kw_str = t_str_new(128); - const char *const *changes; - unsigned int i, count; - - changes = array_get(&change->keyword_changes, &count); - str_tabescape_write(kw_str, changes[0]); - for (i = 1; i < count; i++) { - str_append_c(kw_str, '\t'); - str_tabescape_write(kw_str, changes[i]); - } - dsync_serializer_encode_add(encoder, "keyword_changes", - str_c(kw_str)); - } - - dsync_serializer_encode_finish(&encoder, str); - dsync_slave_stream_send_string(slave, str); -} - -static enum dsync_slave_recv_ret -dsync_slave_stream_recv_change(struct dsync_slave *_slave, - const struct dsync_mail_change **change_r) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - pool_t pool = slave->ret_pool; - struct dsync_deserializer_decoder *decoder; - struct dsync_mail_change *change; - const char *value; - enum dsync_slave_recv_ret ret; - - p_clear(pool); - change = p_new(pool, struct dsync_mail_change, 1); - - ret = dsync_slave_stream_input_next(slave, ITEM_MAIL_CHANGE, &decoder); - if (ret != DSYNC_SLAVE_RECV_RET_OK) - return ret; - - value = dsync_deserializer_decode_get(decoder, "type"); - switch (*value) { - case 's': - change->type = DSYNC_MAIL_CHANGE_TYPE_SAVE; - break; - case 'e': - change->type = DSYNC_MAIL_CHANGE_TYPE_EXPUNGE; - break; - case 'f': - change->type = DSYNC_MAIL_CHANGE_TYPE_FLAG_CHANGE; - break; - default: - dsync_slave_input_error(slave, decoder, - "Invalid type: %s", value); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - - value = dsync_deserializer_decode_get(decoder, "uid"); - if (str_to_uint32(value, &change->uid) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid uid"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - - if (dsync_deserializer_decode_try(decoder, "guid", &value)) - change->guid = p_strdup(pool, value); - if (dsync_deserializer_decode_try(decoder, "hdr_hash", &value)) - change->hdr_hash = p_strdup(pool, value); - if (dsync_deserializer_decode_try(decoder, "modseq", &value) && - str_to_uint64(value, &change->modseq) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid modseq"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - if (dsync_deserializer_decode_try(decoder, "save_timestamp", &value) && - str_to_time(value, &change->save_timestamp) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid save_timestamp"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - - if (dsync_deserializer_decode_try(decoder, "add_flags", &value)) - change->add_flags = strtoul(value, NULL, 16); - if (dsync_deserializer_decode_try(decoder, "remove_flags", &value)) - change->remove_flags = strtoul(value, NULL, 16); - if (dsync_deserializer_decode_try(decoder, "final_flags", &value)) - change->final_flags = strtoul(value, NULL, 16); - if (dsync_deserializer_decode_try(decoder, "keywords_reset", &value)) - change->keywords_reset = TRUE; - - if (dsync_deserializer_decode_try(decoder, "keyword_changes", &value) && - *value != '\0') { - const char *const *changes = t_strsplit_tab(value); - unsigned int i, count = str_array_length(changes); - - p_array_init(&change->keyword_changes, pool, count); - for (i = 0; i < count; i++) { - value = p_strdup(pool, changes[i]); - array_append(&change->keyword_changes, &value, 1); - } - } - - *change_r = change; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_stream_send_mail_request(struct dsync_slave *_slave, - const struct dsync_mail_request *request) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_serializer_encoder *encoder; - string_t *str = t_str_new(128); - - str_append_c(str, items[ITEM_MAIL_REQUEST].chr); - encoder = dsync_serializer_encode_begin(slave->serializers[ITEM_MAIL_REQUEST]); - if (request->guid != NULL) - dsync_serializer_encode_add(encoder, "guid", request->guid); - if (request->uid != 0) { - dsync_serializer_encode_add(encoder, "uid", - dec2str(request->uid)); - } - dsync_serializer_encode_finish(&encoder, str); - dsync_slave_stream_send_string(slave, str); -} - -static enum dsync_slave_recv_ret -dsync_slave_stream_recv_mail_request(struct dsync_slave *_slave, - const struct dsync_mail_request **request_r) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_deserializer_decoder *decoder; - struct dsync_mail_request *request; - const char *value; - enum dsync_slave_recv_ret ret; - - p_clear(slave->ret_pool); - request = p_new(slave->ret_pool, struct dsync_mail_request, 1); - - ret = dsync_slave_stream_input_next(slave, ITEM_MAIL_REQUEST, &decoder); - if (ret != DSYNC_SLAVE_RECV_RET_OK) - return ret; - - if (dsync_deserializer_decode_try(decoder, "guid", &value)) - request->guid = p_strdup(slave->ret_pool, value); - if (dsync_deserializer_decode_try(decoder, "uid", &value) && - str_to_uint32(value, &request->uid) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid uid"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - - *request_r = request; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void -dsync_slave_stream_send_mail(struct dsync_slave *_slave, - const struct dsync_mail *mail) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - struct dsync_serializer_encoder *encoder; - string_t *str = t_str_new(128); - - i_assert(slave->mail_output == NULL); - - str_append_c(str, items[ITEM_MAIL].chr); - encoder = dsync_serializer_encode_begin(slave->serializers[ITEM_MAIL]); - if (mail->guid != NULL) - dsync_serializer_encode_add(encoder, "guid", mail->guid); - if (mail->uid != 0) - dsync_serializer_encode_add(encoder, "uid", dec2str(mail->uid)); - if (mail->pop3_uidl != NULL) { - dsync_serializer_encode_add(encoder, "pop3_uidl", - mail->pop3_uidl); - } - if (mail->pop3_order > 0) { - dsync_serializer_encode_add(encoder, "pop3_order", - dec2str(mail->pop3_order)); - } - if (mail->received_date > 0) { - dsync_serializer_encode_add(encoder, "received_date", - dec2str(mail->received_date)); - } - if (mail->input != NULL) - dsync_serializer_encode_add(encoder, "stream", ""); - - dsync_serializer_encode_finish(&encoder, str); - dsync_slave_stream_send_string(slave, str); - - if (mail->input != NULL) { - slave->mail_output_last = '\0'; - slave->mail_output = mail->input; - i_stream_ref(slave->mail_output); - (void)dsync_slave_stream_send_mail_stream(slave); - } -} - -static int seekable_fd_callback(const char **path_r, void *context) -{ - struct dsync_slave_stream *slave = context; - string_t *path; - int fd; - - path = t_str_new(128); - str_append(path, slave->temp_path_prefix); - fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1); - if (fd == -1) { - i_error("safe_mkstemp(%s) failed: %m", str_c(path)); - return -1; - } - - /* we just want the fd, unlink it */ - if (unlink(str_c(path)) < 0) { - /* shouldn't happen.. */ - i_error("unlink(%s) failed: %m", str_c(path)); - i_close_fd(&fd); - return -1; - } - - *path_r = str_c(path); - return fd; -} - -static enum dsync_slave_recv_ret -dsync_slave_stream_recv_mail(struct dsync_slave *_slave, - struct dsync_mail **mail_r) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - pool_t pool = slave->ret_pool; - struct dsync_deserializer_decoder *decoder; - struct dsync_mail *mail; - struct istream *inputs[2]; - const char *value; - enum dsync_slave_recv_ret ret; - - if (slave->mail_input != NULL) { - /* wait until the mail's stream has been read */ - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - if (slave->cur_mail != NULL) { - /* finished reading the stream, return the mail now */ - *mail_r = slave->cur_mail; - slave->cur_mail = NULL; - return DSYNC_SLAVE_RECV_RET_OK; - } - - p_clear(pool); - mail = p_new(pool, struct dsync_mail, 1); - - ret = dsync_slave_stream_input_next(slave, ITEM_MAIL, &decoder); - if (ret != DSYNC_SLAVE_RECV_RET_OK) - return ret; - - if (dsync_deserializer_decode_try(decoder, "guid", &value)) - mail->guid = p_strdup(pool, value); - if (dsync_deserializer_decode_try(decoder, "uid", &value) && - str_to_uint32(value, &mail->uid) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid uid"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - if (dsync_deserializer_decode_try(decoder, "pop3_uidl", &value)) - mail->pop3_uidl = p_strdup(pool, value); - if (dsync_deserializer_decode_try(decoder, "pop3_order", &value) && - str_to_uint(value, &mail->pop3_order) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid pop3_order"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - if (dsync_deserializer_decode_try(decoder, "received_date", &value) && - str_to_time(value, &mail->received_date) < 0) { - dsync_slave_input_error(slave, decoder, "Invalid received_date"); - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - if (dsync_deserializer_decode_try(decoder, "stream", &value)) { - inputs[0] = i_stream_create_dot(slave->input, FALSE); - inputs[1] = NULL; - mail->input = i_stream_create_seekable(inputs, - MAIL_READ_FULL_BLOCK_SIZE, seekable_fd_callback, slave); - i_stream_unref(&inputs[0]); - - slave->mail_input = mail->input; - if (dsync_slave_stream_read_mail_stream(slave) <= 0) { - slave->cur_mail = mail; - return DSYNC_SLAVE_RECV_RET_TRYAGAIN; - } - /* already finished reading the stream */ - i_assert(slave->mail_input == NULL); - } - - *mail_r = mail; - return DSYNC_SLAVE_RECV_RET_OK; -} - -static void dsync_slave_stream_flush(struct dsync_slave *_slave) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - - o_stream_uncork(slave->output); - o_stream_cork(slave->output); -} - -static bool dsync_slave_stream_is_send_queue_full(struct dsync_slave *_slave) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - size_t bytes; - - if (slave->mail_output != NULL) - return TRUE; - - bytes = o_stream_get_buffer_used_size(slave->output); - if (bytes < DSYNC_SLAVE_STREAM_OUTBUF_THROTTLE_SIZE) - return FALSE; - - o_stream_set_flush_pending(slave->output, TRUE); - return TRUE; -} - -static bool dsync_slave_stream_has_pending_data(struct dsync_slave *_slave) -{ - struct dsync_slave_stream *slave = (struct dsync_slave_stream *)_slave; - - return slave->has_pending_data; -} - -static const struct dsync_slave_vfuncs dsync_slave_stream_vfuncs = { - dsync_slave_stream_deinit, - dsync_slave_stream_send_handshake, - dsync_slave_stream_recv_handshake, - dsync_slave_stream_send_end_of_list, - dsync_slave_stream_send_mailbox_state, - dsync_slave_stream_recv_mailbox_state, - dsync_slave_stream_send_mailbox_tree_node, - dsync_slave_stream_recv_mailbox_tree_node, - dsync_slave_stream_send_mailbox_deletes, - dsync_slave_stream_recv_mailbox_deletes, - dsync_slave_stream_send_mailbox, - dsync_slave_stream_recv_mailbox, - dsync_slave_stream_send_change, - dsync_slave_stream_recv_change, - dsync_slave_stream_send_mail_request, - dsync_slave_stream_recv_mail_request, - dsync_slave_stream_send_mail, - dsync_slave_stream_recv_mail, - dsync_slave_stream_flush, - dsync_slave_stream_is_send_queue_full, - dsync_slave_stream_has_pending_data -}; - -struct dsync_slave * -dsync_slave_init_stream(int fd_in, int fd_out, const char *name, - const char *temp_path_prefix) -{ - struct dsync_slave_stream *slave; - - slave = i_new(struct dsync_slave_stream, 1); - slave->slave.v = dsync_slave_stream_vfuncs; - slave->fd_in = fd_in; - slave->fd_out = fd_out; - slave->name = i_strdup(name); - slave->temp_path_prefix = i_strdup(temp_path_prefix); - slave->ret_pool = pool_alloconly_create("slave stream data", 2048); - dsync_slave_stream_init(slave); - return &slave->slave; -}
--- a/src/doveadm/dsync/dsync-slave.c Fri Sep 07 16:19:35 2012 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,202 +0,0 @@ -/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "dsync-mail.h" -#include "dsync-slave-private.h" - -void dsync_slave_deinit(struct dsync_slave **_slave) -{ - struct dsync_slave *slave = *_slave; - - *_slave = NULL; - slave->v.deinit(slave); -} - -void dsync_slave_set_io_callback(struct dsync_slave *slave, - io_callback_t *callback, void *context) -{ - slave->io_callback = callback; - slave->io_context = context; -} - -void dsync_slave_send_handshake(struct dsync_slave *slave, - const struct dsync_slave_settings *set) -{ - slave->v.send_handshake(slave, set); -} - -enum dsync_slave_recv_ret -dsync_slave_recv_handshake(struct dsync_slave *slave, - const struct dsync_slave_settings **set_r) -{ - return slave->v.recv_handshake(slave, set_r); -} - -static enum dsync_slave_send_ret -dsync_slave_send_ret(struct dsync_slave *slave) -{ - return slave->v.is_send_queue_full(slave) ? - DSYNC_SLAVE_SEND_RET_FULL : - DSYNC_SLAVE_SEND_RET_OK; -} - -enum dsync_slave_send_ret -dsync_slave_send_end_of_list(struct dsync_slave *slave) -{ - slave->v.send_end_of_list(slave); - return dsync_slave_send_ret(slave); -} - -enum dsync_slave_send_ret -dsync_slave_send_mailbox_state(struct dsync_slave *slave, - const struct dsync_mailbox_state *state) -{ - T_BEGIN { - slave->v.send_mailbox_state(slave, state); - } T_END; - return dsync_slave_send_ret(slave); -} - -enum dsync_slave_recv_ret -dsync_slave_recv_mailbox_state(struct dsync_slave *slave, - struct dsync_mailbox_state *state_r) -{ - return slave->v.recv_mailbox_state(slave, state_r); -} - -enum dsync_slave_send_ret -dsync_slave_send_mailbox_tree_node(struct dsync_slave *slave, - const char *const *name, - const struct dsync_mailbox_node *node) -{ - i_assert(*name != NULL); - - T_BEGIN { - slave->v.send_mailbox_tree_node(slave, name, node); - } T_END; - return dsync_slave_send_ret(slave); -} - -enum dsync_slave_recv_ret -dsync_slave_recv_mailbox_tree_node(struct dsync_slave *slave, - const char *const **name_r, - const struct dsync_mailbox_node **node_r) -{ - return slave->v.recv_mailbox_tree_node(slave, name_r, node_r); -} - -enum dsync_slave_send_ret -dsync_slave_send_mailbox_deletes(struct dsync_slave *slave, - const struct dsync_mailbox_delete *deletes, - unsigned int count, char hierarchy_sep) -{ - T_BEGIN { - slave->v.send_mailbox_deletes(slave, deletes, count, - hierarchy_sep); - } T_END; - return dsync_slave_send_ret(slave); -} - -enum dsync_slave_recv_ret -dsync_slave_recv_mailbox_deletes(struct dsync_slave *slave, - const struct dsync_mailbox_delete **deletes_r, - unsigned int *count_r, char *hierarchy_sep_r) -{ - return slave->v.recv_mailbox_deletes(slave, deletes_r, count_r, - hierarchy_sep_r); -} - -enum dsync_slave_send_ret -dsync_slave_send_mailbox(struct dsync_slave *slave, - const struct dsync_mailbox *dsync_box) -{ - T_BEGIN { - slave->v.send_mailbox(slave, dsync_box); - } T_END; - return dsync_slave_send_ret(slave); -} - -enum dsync_slave_recv_ret -dsync_slave_recv_mailbox(struct dsync_slave *slave, - const struct dsync_mailbox **dsync_box_r) -{ - return slave->v.recv_mailbox(slave, dsync_box_r); -} - -enum dsync_slave_send_ret -dsync_slave_send_change(struct dsync_slave *slave, - const struct dsync_mail_change *change) -{ - i_assert(change->uid > 0); - - T_BEGIN { - slave->v.send_change(slave, change); - } T_END; - return dsync_slave_send_ret(slave); -} - -enum dsync_slave_recv_ret -dsync_slave_recv_change(struct dsync_slave *slave, - const struct dsync_mail_change **change_r) -{ - return slave->v.recv_change(slave, change_r); -} - -enum dsync_slave_send_ret -dsync_slave_send_mail_request(struct dsync_slave *slave, - const struct dsync_mail_request *request) -{ - i_assert(*request->guid != '\0' || request->uid != 0); - - T_BEGIN { - slave->v.send_mail_request(slave, request); - } T_END; - return dsync_slave_send_ret(slave); -} - -enum dsync_slave_recv_ret -dsync_slave_recv_mail_request(struct dsync_slave *slave, - const struct dsync_mail_request **request_r) -{ - return slave->v.recv_mail_request(slave, request_r); -} - -enum dsync_slave_send_ret -dsync_slave_send_mail(struct dsync_slave *slave, - const struct dsync_mail *mail) -{ - i_assert(*mail->guid != '\0' || mail->uid != 0); - - T_BEGIN { - slave->v.send_mail(slave, mail); - } T_END; - return dsync_slave_send_ret(slave); -} - -enum dsync_slave_recv_ret -dsync_slave_recv_mail(struct dsync_slave *slave, - struct dsync_mail **mail_r) -{ - return slave->v.recv_mail(slave, mail_r); -} - -void dsync_slave_flush(struct dsync_slave *slave) -{ - if (slave->v.flush != NULL) - slave->v.flush(slave); -} - -bool dsync_slave_has_failed(struct dsync_slave *slave) -{ - return slave->failed; -} - -bool dsync_slave_is_send_queue_full(struct dsync_slave *slave) -{ - return slave->v.is_send_queue_full(slave); -} - -bool dsync_slave_has_pending_data(struct dsync_slave *slave) -{ - return slave->v.has_pending_data(slave); -}
--- a/src/doveadm/dsync/dsync-slave.h Fri Sep 07 16:19:35 2012 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,118 +0,0 @@ -#ifndef DSYNC_SLAVE_H -#define DSYNC_SLAVE_H - -#include "ioloop.h" -#include "guid.h" -#include "dsync-brain.h" - -struct dsync_mailbox; -struct dsync_mailbox_state; -struct dsync_mailbox_node; -struct dsync_mailbox_delete; -struct dsync_mail; -struct dsync_mail_change; -struct dsync_mail_request; -struct dsync_slave_settings; - -enum dsync_slave_send_ret { - DSYNC_SLAVE_SEND_RET_OK = 1, - /* send queue is full, stop sending more */ - DSYNC_SLAVE_SEND_RET_FULL = 0 -}; - -enum dsync_slave_recv_ret { - DSYNC_SLAVE_RECV_RET_FINISHED = -1, - /* try again / error (the error handling delayed until io callback) */ - DSYNC_SLAVE_RECV_RET_TRYAGAIN = 0, - DSYNC_SLAVE_RECV_RET_OK = 1 -}; - -struct dsync_slave_settings { - /* if non-NULL, sync only this namespace */ - const char *sync_ns_prefix; - - enum dsync_brain_sync_type sync_type; - bool guid_requests; - bool mails_have_guids; -}; - -void dsync_slave_init_pipe(struct dsync_slave **slave1_r, - struct dsync_slave **slave2_r); -struct dsync_slave * -dsync_slave_init_stream(int fd_in, int fd_out, const char *name, - const char *temp_path_prefix); -void dsync_slave_deinit(struct dsync_slave **slave); - -/* I/O callback is called whenever new data is available. It's also called on - errors, so check first the error status. */ -void dsync_slave_set_io_callback(struct dsync_slave *slave, - io_callback_t *callback, void *context); - -void dsync_slave_send_handshake(struct dsync_slave *slave, - const struct dsync_slave_settings *set); -enum dsync_slave_recv_ret -dsync_slave_recv_handshake(struct dsync_slave *slave, - const struct dsync_slave_settings **set_r); - -enum dsync_slave_send_ret ATTR_NOWARN_UNUSED_RESULT -dsync_slave_send_end_of_list(struct dsync_slave *slave); - -enum dsync_slave_send_ret ATTR_NOWARN_UNUSED_RESULT -dsync_slave_send_mailbox_state(struct dsync_slave *slave, - const struct dsync_mailbox_state *state); -enum dsync_slave_recv_ret -dsync_slave_recv_mailbox_state(struct dsync_slave *slave, - struct dsync_mailbox_state *state_r); - -enum dsync_slave_send_ret ATTR_NOWARN_UNUSED_RESULT -dsync_slave_send_mailbox_tree_node(struct dsync_slave *slave, - const char *const *name, - const struct dsync_mailbox_node *node); -enum dsync_slave_recv_ret -dsync_slave_recv_mailbox_tree_node(struct dsync_slave *slave, - const char *const **name_r, - const struct dsync_mailbox_node **node_r); - -enum dsync_slave_send_ret ATTR_NOWARN_UNUSED_RESULT -dsync_slave_send_mailbox_deletes(struct dsync_slave *slave, - const struct dsync_mailbox_delete *deletes, - unsigned int count, char hierarchy_sep); -enum dsync_slave_recv_ret -dsync_slave_recv_mailbox_deletes(struct dsync_slave *slave, - const struct dsync_mailbox_delete **deletes_r, - unsigned int *count_r, char *hierarchy_sep_r); - -enum dsync_slave_send_ret ATTR_NOWARN_UNUSED_RESULT -dsync_slave_send_mailbox(struct dsync_slave *slave, - const struct dsync_mailbox *dsync_box); -enum dsync_slave_recv_ret -dsync_slave_recv_mailbox(struct dsync_slave *slave, - const struct dsync_mailbox **dsync_box_r); - -enum dsync_slave_send_ret ATTR_NOWARN_UNUSED_RESULT -dsync_slave_send_change(struct dsync_slave *slave, - const struct dsync_mail_change *change); -enum dsync_slave_recv_ret -dsync_slave_recv_change(struct dsync_slave *slave, - const struct dsync_mail_change **change_r); - -enum dsync_slave_send_ret ATTR_NOWARN_UNUSED_RESULT -dsync_slave_send_mail_request(struct dsync_slave *slave, - const struct dsync_mail_request *request); -enum dsync_slave_recv_ret -dsync_slave_recv_mail_request(struct dsync_slave *slave, - const struct dsync_mail_request **request_r); - -enum dsync_slave_send_ret ATTR_NOWARN_UNUSED_RESULT -dsync_slave_send_mail(struct dsync_slave *slave, - const struct dsync_mail *mail); -enum dsync_slave_recv_ret -dsync_slave_recv_mail(struct dsync_slave *slave, - struct dsync_mail **mail_r); - -void dsync_slave_flush(struct dsync_slave *slave); -bool dsync_slave_has_failed(struct dsync_slave *slave); -bool dsync_slave_is_send_queue_full(struct dsync_slave *slave); -bool dsync_slave_has_pending_data(struct dsync_slave *slave); - -#endif