Mercurial > dovecot > core-2.2
view src/dsync/dsync-proxy-client.c @ 10582:615eef3139c2 HEAD
Updated copyright notices to include year 2010.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Mon, 25 Jan 2010 01:19:08 +0200 |
parents | 96db209efe22 |
children | ba75ab0c3e10 |
line wrap: on
line source
/* Copyright (c) 2009-2010 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "array.h" #include "aqueue.h" #include "fd-set-nonblock.h" #include "istream.h" #include "istream-dot.h" #include "ostream.h" #include "str.h" #include "strescape.h" #include "master-service.h" #include "imap-util.h" #include "dsync-proxy.h" #include "dsync-worker-private.h" #include <stdlib.h> #include <unistd.h> #define OUTBUF_THROTTLE_SIZE (1024*64) enum proxy_client_request_type { PROXY_CLIENT_REQUEST_TYPE_COPY, PROXY_CLIENT_REQUEST_TYPE_GET, PROXY_CLIENT_REQUEST_TYPE_FINISH }; struct proxy_client_request { enum proxy_client_request_type type; union { dsync_worker_msg_callback_t *get; dsync_worker_copy_callback_t *copy; dsync_worker_finish_callback_t *finish; } callback; void *context; }; struct proxy_client_dsync_worker_mailbox_iter { struct dsync_worker_mailbox_iter iter; pool_t pool; }; struct proxy_client_dsync_worker_subs_iter { struct dsync_worker_subs_iter iter; pool_t pool; }; struct proxy_client_dsync_worker { struct dsync_worker worker; int fd_in, fd_out; struct io *io; struct istream *input; struct ostream *output; struct timeout *to; mailbox_guid_t selected_box_guid; struct istream *save_input; struct io *save_io; bool save_input_last_lf; pool_t msg_get_pool; struct dsync_msg_static_data msg_get_data; ARRAY_DEFINE(request_array, struct proxy_client_request); struct aqueue *request_queue; unsigned int handshake_received:1; unsigned int finished:1; }; extern struct dsync_worker_vfuncs proxy_client_dsync_worker; static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker); static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker); static void proxy_client_fail(struct proxy_client_dsync_worker *worker) { i_stream_close(worker->input); dsync_worker_set_failure(&worker->worker); master_service_stop(master_service); } static int proxy_client_worker_read_line(struct proxy_client_dsync_worker *worker, const char **line_r) { if (worker->worker.failed) return -1; *line_r = i_stream_read_next_line(worker->input); if (*line_r == NULL) { if (worker->input->stream_errno != 0) { errno = worker->input->stream_errno; i_error("read() from worker server failed: %m"); dsync_worker_set_failure(&worker->worker); return -1; } if (worker->input->eof) { if (!worker->finished) i_error("read() from worker server failed: EOF"); dsync_worker_set_failure(&worker->worker); return -1; } } if (*line_r == NULL) return 0; if (!worker->handshake_received) { if (strcmp(*line_r, DSYNC_PROXY_SERVER_GREETING_LINE) != 0) { i_error("Invalid server handshake: %s", *line_r); dsync_worker_set_failure(&worker->worker); return -1; } worker->handshake_received = TRUE; return proxy_client_worker_read_line(worker, line_r); } return 1; } static void proxy_client_worker_msg_get_done(struct proxy_client_dsync_worker *worker) { struct istream *input = worker->msg_get_data.input; const unsigned char *data; size_t size; i_assert(worker->io == NULL); worker->msg_get_data.input = NULL; worker->io = io_add(worker->fd_in, IO_READ, proxy_client_worker_input, worker); /* we'll need to read the input until EOF or we'll start treating the input as commands. make sure saving read everything. */ while ((i_stream_read_data(input, &data, &size, 0)) > 0) i_stream_skip(input, size); } static bool proxy_client_worker_next_copy(const struct proxy_client_request *request, const char *line) { request->callback.copy(*line == '1', request->context); return TRUE; } static bool proxy_client_worker_next_msg_get(struct proxy_client_dsync_worker *worker, const struct proxy_client_request *request, const char *line) { enum dsync_msg_get_result result = DSYNC_MSG_GET_RESULT_FAILED; const char *p, *error; uint32_t uid; i_assert(worker->msg_get_data.input == NULL); p_clear(worker->msg_get_pool); switch (line[0]) { case '1': /* ok */ if (line[1] != '\t') break; line += 2; if ((p = strchr(line, '\t')) == NULL) break; uid = strtoul(t_strcut(line, '\t'), NULL, 10); line = p + 1; if (dsync_proxy_msg_static_import(worker->msg_get_pool, line, &worker->msg_get_data, &error) < 0) { i_error("Invalid msg-get static input: %s", error); proxy_client_fail(worker); return FALSE; } worker->msg_get_data.input = i_stream_create_dot(worker->input, FALSE); i_stream_set_destroy_callback(worker->msg_get_data.input, proxy_client_worker_msg_get_done, worker); io_remove(&worker->io); result = DSYNC_MSG_GET_RESULT_SUCCESS; break; case '0': /* expunged */ result = DSYNC_MSG_GET_RESULT_EXPUNGED; break; default: /* failure */ break; } request->callback.get(result, &worker->msg_get_data, request->context); return worker->io != NULL; } static void proxy_client_worker_next_finish(struct proxy_client_dsync_worker *worker, const struct proxy_client_request *request, const char *line) { bool success = TRUE; if (strcmp(line, "changes") == 0) worker->worker.unexpected_changes = TRUE; else if (strcmp(line, "ok") != 0) success = FALSE; request->callback.finish(success, request->context); } static bool proxy_client_worker_next_reply(struct proxy_client_dsync_worker *worker, const char *line) { const struct proxy_client_request *requests; struct proxy_client_request request; bool ret = TRUE; if (aqueue_count(worker->request_queue) == 0) { i_error("Unexpected reply from server: %s", line); proxy_client_fail(worker); return FALSE; } requests = array_idx(&worker->request_array, 0); request = requests[aqueue_idx(worker->request_queue, 0)]; aqueue_delete_tail(worker->request_queue); switch (request.type) { case PROXY_CLIENT_REQUEST_TYPE_COPY: ret = proxy_client_worker_next_copy(&request, line); break; case PROXY_CLIENT_REQUEST_TYPE_GET: ret = proxy_client_worker_next_msg_get(worker, &request, line); break; case PROXY_CLIENT_REQUEST_TYPE_FINISH: worker->finished = TRUE; proxy_client_worker_next_finish(worker, &request, line); break; } return ret; } static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker) { const char *line; int ret; timeout_reset(worker->to); if (worker->worker.input_callback != NULL) { worker->worker.input_callback(worker->worker.input_context); return; } while ((ret = proxy_client_worker_read_line(worker, &line)) > 0) { if (!proxy_client_worker_next_reply(worker, line)) break; } if (ret < 0) { /* try to continue */ proxy_client_worker_next_reply(worker, ""); } } static int proxy_client_worker_output(struct proxy_client_dsync_worker *worker) { int ret; timeout_reset(worker->to); if ((ret = o_stream_flush(worker->output)) < 0) return 1; if (worker->save_input != NULL) { /* proxy_client_worker_msg_save() hasn't finished yet. */ o_stream_cork(worker->output); proxy_client_send_stream(worker); if (worker->save_input != NULL) return 1; } if (worker->worker.output_callback != NULL) worker->worker.output_callback(worker->worker.output_context); return ret; } static void proxy_client_worker_timeout(void *context ATTR_UNUSED) { i_error("proxy client timed out"); master_service_stop(master_service); } struct dsync_worker *dsync_worker_init_proxy_client(int fd_in, int fd_out) { struct proxy_client_dsync_worker *worker; worker = i_new(struct proxy_client_dsync_worker, 1); worker->worker.v = proxy_client_dsync_worker; worker->fd_in = fd_in; worker->fd_out = fd_out; worker->to = timeout_add(DSYNC_PROXY_TIMEOUT_MSECS, proxy_client_worker_timeout, NULL); worker->io = io_add(fd_in, IO_READ, proxy_client_worker_input, worker); worker->input = i_stream_create_fd(fd_in, (size_t)-1, FALSE); worker->output = o_stream_create_fd(fd_out, (size_t)-1, FALSE); o_stream_send_str(worker->output, DSYNC_PROXY_CLIENT_GREETING_LINE"\n"); /* we'll keep the output corked until flush is needed */ o_stream_cork(worker->output); o_stream_set_flush_callback(worker->output, proxy_client_worker_output, worker); fd_set_nonblock(fd_in, TRUE); fd_set_nonblock(fd_out, TRUE); worker->msg_get_pool = pool_alloconly_create("dsync proxy msg", 128); i_array_init(&worker->request_array, 64); worker->request_queue = aqueue_init(&worker->request_array.arr); return &worker->worker; } static void proxy_client_worker_deinit(struct dsync_worker *_worker) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; timeout_remove(&worker->to); if (worker->io != NULL) io_remove(&worker->io); i_stream_destroy(&worker->input); o_stream_destroy(&worker->output); if (close(worker->fd_in) < 0) i_error("close(worker input) failed: %m"); if (worker->fd_in != worker->fd_out) { if (close(worker->fd_out) < 0) i_error("close(worker output) failed: %m"); } aqueue_deinit(&worker->request_queue); array_free(&worker->request_array); pool_unref(&worker->msg_get_pool); i_free(worker); } static bool worker_is_output_stream_full(struct proxy_client_dsync_worker *worker) { return o_stream_get_buffer_used_size(worker->output) >= OUTBUF_THROTTLE_SIZE; } static bool proxy_client_worker_is_output_full(struct dsync_worker *_worker) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; if (worker->save_io != NULL) { /* we haven't finished sending a message save, so we're full. */ return TRUE; } return worker_is_output_stream_full(worker); } static int proxy_client_worker_output_flush(struct dsync_worker *_worker) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; if (o_stream_flush(worker->output) < 0) return -1; o_stream_uncork(worker->output); if (o_stream_get_buffer_used_size(worker->output) > 0) return 0; o_stream_cork(worker->output); return 1; } static struct dsync_worker_mailbox_iter * proxy_client_worker_mailbox_iter_init(struct dsync_worker *_worker) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; struct proxy_client_dsync_worker_mailbox_iter *iter; iter = i_new(struct proxy_client_dsync_worker_mailbox_iter, 1); iter->iter.worker = _worker; iter->pool = pool_alloconly_create("proxy mailbox iter", 1024); o_stream_send_str(worker->output, "BOX-LIST\n"); proxy_client_worker_output_flush(_worker); return &iter->iter; } static int proxy_client_worker_mailbox_iter_next(struct dsync_worker_mailbox_iter *_iter, struct dsync_mailbox *dsync_box_r) { struct proxy_client_dsync_worker_mailbox_iter *iter = (struct proxy_client_dsync_worker_mailbox_iter *)_iter; struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_iter->worker; const char *line, *error; int ret; if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) { if (ret < 0) _iter->failed = TRUE; return ret; } if (*line == '\t') { /* end of mailboxes */ if (line[1] != '0') _iter->failed = TRUE; return -1; } p_clear(iter->pool); if (dsync_proxy_mailbox_import(iter->pool, line, dsync_box_r, &error) < 0) { i_error("Invalid mailbox input from worker server: %s", error); _iter->failed = TRUE; return -1; } return 1; } static int proxy_client_worker_mailbox_iter_deinit(struct dsync_worker_mailbox_iter *_iter) { struct proxy_client_dsync_worker_mailbox_iter *iter = (struct proxy_client_dsync_worker_mailbox_iter *)_iter; int ret = _iter->failed ? -1 : 0; pool_unref(&iter->pool); i_free(iter); return ret; } static struct dsync_worker_subs_iter * proxy_client_worker_subs_iter_init(struct dsync_worker *_worker) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; struct proxy_client_dsync_worker_subs_iter *iter; iter = i_new(struct proxy_client_dsync_worker_subs_iter, 1); iter->iter.worker = _worker; iter->pool = pool_alloconly_create("proxy subscription iter", 1024); o_stream_send_str(worker->output, "SUBS-LIST\n"); proxy_client_worker_output_flush(_worker); return &iter->iter; } static int proxy_client_worker_subs_iter_next_line(struct proxy_client_dsync_worker_subs_iter *iter, unsigned int wanted_arg_count, char ***args_r) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)iter->iter.worker; const char *line; char **args; int ret; if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) { if (ret < 0) iter->iter.failed = TRUE; return ret; } if (*line == '\t') { /* end of subscribed subscriptions */ if (line[1] != '0') iter->iter.failed = TRUE; return -1; } p_clear(iter->pool); args = p_strsplit(iter->pool, line, "\t"); if (str_array_length((const char *const *)args) < wanted_arg_count) { i_error("Invalid subscription input from worker server"); iter->iter.failed = TRUE; return -1; } *args_r = args; return 1; } static int proxy_client_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter, struct dsync_worker_subscription *rec_r) { struct proxy_client_dsync_worker_subs_iter *iter = (struct proxy_client_dsync_worker_subs_iter *)_iter; char **args; int ret; ret = proxy_client_worker_subs_iter_next_line(iter, 4, &args); if (ret <= 0) return ret; rec_r->vname = str_tabunescape(args[0]); rec_r->storage_name = str_tabunescape(args[1]); rec_r->ns_prefix = str_tabunescape(args[2]); rec_r->last_change = strtoul(args[3], NULL, 10); return 1; } static int proxy_client_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter, struct dsync_worker_unsubscription *rec_r) { struct proxy_client_dsync_worker_subs_iter *iter = (struct proxy_client_dsync_worker_subs_iter *)_iter; char **args; int ret; ret = proxy_client_worker_subs_iter_next_line(iter, 3, &args); if (ret <= 0) return ret; memset(rec_r, 0, sizeof(*rec_r)); if (dsync_proxy_mailbox_guid_import(args[0], &rec_r->name_sha1) < 0) { i_error("Invalid subscription input from worker server: " "Invalid unsubscription mailbox GUID"); iter->iter.failed = TRUE; return -1; } rec_r->ns_prefix = str_tabunescape(args[1]); rec_r->last_change = strtoul(args[2], NULL, 10); return 1; } static int proxy_client_worker_subs_iter_deinit(struct dsync_worker_subs_iter *_iter) { struct proxy_client_dsync_worker_subs_iter *iter = (struct proxy_client_dsync_worker_subs_iter *)_iter; int ret = _iter->failed ? -1 : 0; pool_unref(&iter->pool); i_free(iter); return ret; } static void proxy_client_worker_set_subscribed(struct dsync_worker *_worker, const char *name, time_t last_change, bool set) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; T_BEGIN { string_t *str = t_str_new(128); str_append(str, "SUBS-SET\t"); str_tabescape_write(str, name); str_printfa(str, "\t%s\t%d\n", dec2str(last_change), set ? 1 : 0); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; } struct proxy_client_dsync_worker_msg_iter { struct dsync_worker_msg_iter iter; pool_t pool; bool done; }; static struct dsync_worker_msg_iter * proxy_client_worker_msg_iter_init(struct dsync_worker *_worker, const mailbox_guid_t mailboxes[], unsigned int mailbox_count) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; struct proxy_client_dsync_worker_msg_iter *iter; string_t *str; unsigned int i; iter = i_new(struct proxy_client_dsync_worker_msg_iter, 1); iter->iter.worker = _worker; iter->pool = pool_alloconly_create("proxy message iter", 10240); str = str_new(iter->pool, 512); str_append(str, "MSG-LIST"); for (i = 0; i < mailbox_count; i++) T_BEGIN { str_append_c(str, '\t'); dsync_proxy_mailbox_guid_export(str, &mailboxes[i]); } T_END; str_append_c(str, '\n'); o_stream_send(worker->output, str_data(str), str_len(str)); p_clear(iter->pool); proxy_client_worker_output_flush(_worker); return &iter->iter; } static int proxy_client_worker_msg_iter_next(struct dsync_worker_msg_iter *_iter, unsigned int *mailbox_idx_r, struct dsync_message *msg_r) { struct proxy_client_dsync_worker_msg_iter *iter = (struct proxy_client_dsync_worker_msg_iter *)_iter; struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_iter->worker; const char *line, *error; int ret; if (iter->done) return -1; if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) { if (ret < 0) _iter->failed = TRUE; return ret; } if (*line == '\t') { /* end of messages */ if (line[1] != '0') _iter->failed = TRUE; iter->done = TRUE; return -1; } *mailbox_idx_r = 0; while (*line >= '0' && *line <= '9') { *mailbox_idx_r = *mailbox_idx_r * 10 + (*line - '0'); line++; } if (*line != '\t') { i_error("Invalid mailbox idx from worker server"); _iter->failed = TRUE; return -1; } line++; p_clear(iter->pool); if (dsync_proxy_msg_import(iter->pool, line, msg_r, &error) < 0) { i_error("Invalid message input from worker server: %s", error); _iter->failed = TRUE; return -1; } return 1; } static int proxy_client_worker_msg_iter_deinit(struct dsync_worker_msg_iter *_iter) { struct proxy_client_dsync_worker_msg_iter *iter = (struct proxy_client_dsync_worker_msg_iter *)_iter; int ret = _iter->failed ? -1 : 0; pool_unref(&iter->pool); i_free(iter); return ret; } static void proxy_client_worker_create_mailbox(struct dsync_worker *_worker, const struct dsync_mailbox *dsync_box) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; i_assert(worker->save_input == NULL); T_BEGIN { string_t *str = t_str_new(128); str_append(str, "BOX-CREATE\t"); dsync_proxy_mailbox_export(str, dsync_box); str_append_c(str, '\n'); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; } static void proxy_client_worker_delete_mailbox(struct dsync_worker *_worker, const struct dsync_mailbox *dsync_box) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; i_assert(worker->save_input == NULL); T_BEGIN { string_t *str = t_str_new(128); str_append(str, "BOX-DELETE\t"); dsync_proxy_mailbox_guid_export(str, &dsync_box->mailbox_guid); str_printfa(str, "\t%s\n", dec2str(dsync_box->last_change)); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; } static void proxy_client_worker_rename_mailbox(struct dsync_worker *_worker, const mailbox_guid_t *mailbox, const struct dsync_mailbox *dsync_box) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; char sep[2]; i_assert(worker->save_input == NULL); T_BEGIN { string_t *str = t_str_new(128); str_append(str, "BOX-RENAME\t"); dsync_proxy_mailbox_guid_export(str, mailbox); str_append_c(str, '\t'); str_tabescape_write(str, dsync_box->name); str_append_c(str, '\t'); sep[0] = dsync_box->name_sep; sep[1] = '\0'; str_tabescape_write(str, sep); str_append_c(str, '\n'); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; } static void proxy_client_worker_update_mailbox(struct dsync_worker *_worker, const struct dsync_mailbox *dsync_box) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; i_assert(worker->save_input == NULL); T_BEGIN { string_t *str = t_str_new(128); str_append(str, "BOX-UPDATE\t"); dsync_proxy_mailbox_export(str, dsync_box); str_append_c(str, '\n'); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; } static void proxy_client_worker_select_mailbox(struct dsync_worker *_worker, const mailbox_guid_t *mailbox, const ARRAY_TYPE(const_string) *cache_fields) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; i_assert(worker->save_input == NULL); if (dsync_guid_equals(&worker->selected_box_guid, mailbox)) return; worker->selected_box_guid = *mailbox; T_BEGIN { string_t *str = t_str_new(128); str_append(str, "BOX-SELECT\t"); dsync_proxy_mailbox_guid_export(str, mailbox); if (cache_fields != NULL) dsync_proxy_strings_export(str, cache_fields); str_append_c(str, '\n'); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; } static void proxy_client_worker_msg_update_metadata(struct dsync_worker *_worker, const struct dsync_message *msg) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; i_assert(worker->save_input == NULL); T_BEGIN { string_t *str = t_str_new(128); str_printfa(str, "MSG-UPDATE\t%u\t%llu\t", msg->uid, (unsigned long long)msg->modseq); imap_write_flags(str, msg->flags, msg->keywords); str_append_c(str, '\n'); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; } static void proxy_client_worker_msg_update_uid(struct dsync_worker *_worker, uint32_t old_uid, uint32_t new_uid) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; i_assert(worker->save_input == NULL); T_BEGIN { o_stream_send_str(worker->output, t_strdup_printf("MSG-UID-CHANGE\t%u\t%u\n", old_uid, new_uid)); } T_END; } static void proxy_client_worker_msg_expunge(struct dsync_worker *_worker, uint32_t uid) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; i_assert(worker->save_input == NULL); T_BEGIN { o_stream_send_str(worker->output, t_strdup_printf("MSG-EXPUNGE\t%u\n", uid)); } T_END; } static void proxy_client_worker_msg_copy(struct dsync_worker *_worker, const mailbox_guid_t *src_mailbox, uint32_t src_uid, const struct dsync_message *dest_msg, dsync_worker_copy_callback_t *callback, void *context) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; struct proxy_client_request request; i_assert(worker->save_input == NULL); T_BEGIN { string_t *str = t_str_new(128); str_append(str, "MSG-COPY\t"); dsync_proxy_mailbox_guid_export(str, src_mailbox); str_printfa(str, "\t%u\t", src_uid); dsync_proxy_msg_export(str, dest_msg); str_append_c(str, '\n'); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; memset(&request, 0, sizeof(request)); request.type = PROXY_CLIENT_REQUEST_TYPE_COPY; request.callback.copy = callback; request.context = context; aqueue_append(worker->request_queue, &request); } static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker) { const unsigned char *data; size_t size; int ret; while ((ret = i_stream_read_data(worker->save_input, &data, &size, 0)) > 0) { dsync_proxy_send_dot_output(worker->output, &worker->save_input_last_lf, data, size); i_stream_skip(worker->save_input, size); if (worker_is_output_stream_full(worker)) { o_stream_uncork(worker->output); if (worker_is_output_stream_full(worker)) return; o_stream_cork(worker->output); } } if (ret == 0) { /* waiting for more input */ o_stream_uncork(worker->output); if (worker->save_io == NULL) { int fd = i_stream_get_fd(worker->save_input); worker->save_io = io_add(fd, IO_READ, proxy_client_send_stream, worker); } return; } if (worker->save_io != NULL) io_remove(&worker->save_io); if (worker->save_input->stream_errno != 0) { errno = worker->save_input->stream_errno; i_error("proxy: reading message input failed: %m"); o_stream_close(worker->output); } else { i_assert(!i_stream_have_bytes_left(worker->save_input)); o_stream_send(worker->output, "\n.\n", 3); } i_stream_unref(&worker->save_input); } static void proxy_client_worker_msg_save(struct dsync_worker *_worker, const struct dsync_message *msg, const struct dsync_msg_static_data *data) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; i_assert(worker->save_input == NULL); T_BEGIN { string_t *str = t_str_new(128); str_append(str, "MSG-SAVE\t"); dsync_proxy_msg_static_export(str, data); str_append_c(str, '\t'); dsync_proxy_msg_export(str, msg); str_append_c(str, '\n'); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; i_assert(worker->save_io == NULL); worker->save_input = data->input; worker->save_input_last_lf = TRUE; i_stream_ref(worker->save_input); proxy_client_send_stream(worker); } static void proxy_client_worker_msg_save_cancel(struct dsync_worker *_worker) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; if (worker->save_io != NULL) io_remove(&worker->save_io); if (worker->save_input != NULL) i_stream_unref(&worker->save_input); } static void proxy_client_worker_msg_get(struct dsync_worker *_worker, const mailbox_guid_t *mailbox, uint32_t uid, dsync_worker_msg_callback_t *callback, void *context) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; struct proxy_client_request request; i_assert(worker->save_input == NULL); T_BEGIN { string_t *str = t_str_new(128); str_append(str, "MSG-GET\t"); dsync_proxy_mailbox_guid_export(str, mailbox); str_printfa(str, "\t%u\n", uid); o_stream_send(worker->output, str_data(str), str_len(str)); } T_END; memset(&request, 0, sizeof(request)); request.type = PROXY_CLIENT_REQUEST_TYPE_GET; request.callback.get = callback; request.context = context; aqueue_append(worker->request_queue, &request); } static void proxy_client_worker_finish(struct dsync_worker *_worker, dsync_worker_finish_callback_t *callback, void *context) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; struct proxy_client_request request; i_assert(worker->save_input == NULL); o_stream_send_str(worker->output, "FINISH\n"); o_stream_uncork(worker->output); memset(&request, 0, sizeof(request)); request.type = PROXY_CLIENT_REQUEST_TYPE_FINISH; request.callback.finish = callback; request.context = context; aqueue_append(worker->request_queue, &request); } struct dsync_worker_vfuncs proxy_client_dsync_worker = { proxy_client_worker_deinit, proxy_client_worker_is_output_full, proxy_client_worker_output_flush, proxy_client_worker_mailbox_iter_init, proxy_client_worker_mailbox_iter_next, proxy_client_worker_mailbox_iter_deinit, proxy_client_worker_subs_iter_init, proxy_client_worker_subs_iter_next, proxy_client_worker_subs_iter_next_un, proxy_client_worker_subs_iter_deinit, proxy_client_worker_set_subscribed, proxy_client_worker_msg_iter_init, proxy_client_worker_msg_iter_next, proxy_client_worker_msg_iter_deinit, proxy_client_worker_create_mailbox, proxy_client_worker_delete_mailbox, proxy_client_worker_rename_mailbox, proxy_client_worker_update_mailbox, proxy_client_worker_select_mailbox, proxy_client_worker_msg_update_metadata, proxy_client_worker_msg_update_uid, proxy_client_worker_msg_expunge, proxy_client_worker_msg_copy, proxy_client_worker_msg_save, proxy_client_worker_msg_save_cancel, proxy_client_worker_msg_get, proxy_client_worker_finish };