Mercurial > dovecot > core-2.2
changeset 12604:0e5c36a54ce8
imapc: Added support for saving and copying messages.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Fri, 28 Jan 2011 17:57:21 +0200 |
parents | 0c8fb305df30 |
children | d675c0264190 |
files | src/lib-storage/index/imapc/imapc-connection.c src/lib-storage/index/imapc/imapc-mailbox.c src/lib-storage/index/imapc/imapc-save.c src/lib-storage/index/imapc/imapc-search.c src/lib-storage/index/imapc/imapc-storage.c src/lib-storage/index/imapc/imapc-storage.h |
diffstat | 6 files changed, 422 insertions(+), 58 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lib-storage/index/imapc/imapc-connection.c Wed Jan 26 21:47:52 2011 +0200 +++ b/src/lib-storage/index/imapc/imapc-connection.c Fri Jan 28 17:57:21 2011 +0200 @@ -28,12 +28,19 @@ IMAPC_INPUT_STATE_SKIPLINE }; +struct imapc_command_stream { + unsigned int pos; + struct istream *input; +}; + struct imapc_command { pool_t pool; buffer_t *data; unsigned int send_pos; unsigned int tag; + ARRAY_DEFINE(streams, struct imapc_command_stream); + imapc_command_callback_t *callback; void *context; }; @@ -73,6 +80,7 @@ static void imapc_connection_disconnect(struct imapc_connection *conn); +static void imapc_command_free(struct imapc_command *cmd); static void imapc_command_send_more(struct imapc_connection *conn, struct imapc_command *cmd); @@ -133,7 +141,7 @@ array_delete(&conn->cmd_wait_list, 0, 1); cmd->callback(&reply, cmd->context); - pool_unref(&cmd->pool); + imapc_command_free(cmd); } while (array_count(&conn->cmd_send_queue) > 0) { cmdp = array_idx(&conn->cmd_send_queue, 0); @@ -141,7 +149,7 @@ array_delete(&conn->cmd_send_queue, 0, 1); cmd->callback(&reply, cmd->context); - pool_unref(&cmd->pool); + imapc_command_free(cmd); } } if (state == IMAPC_CONNECTION_STATE_DONE) { @@ -613,7 +621,7 @@ imapc_connection_input_reset(conn); cmd->callback(&reply, cmd->context); - pool_unref(&cmd->pool); + imapc_command_free(cmd); return 1; } @@ -811,6 +819,17 @@ return cmd; } +static void imapc_command_free(struct imapc_command *cmd) +{ + struct imapc_command_stream *stream; + + if (array_is_created(&cmd->streams)) { + array_foreach_modifiable(&cmd->streams, stream) + i_stream_unref(&stream->input); + } + pool_unref(&cmd->pool); +} + static bool parse_sync_literal(const unsigned char *data, unsigned int pos, unsigned int *value_r) @@ -836,16 +855,62 @@ return TRUE; } +static void imapc_command_send_done(struct imapc_connection *conn, + struct imapc_command *cmd) +{ + /* everything sent. move command to wait list. */ + i_assert(*array_idx(&conn->cmd_send_queue, 0) == cmd); + array_delete(&conn->cmd_send_queue, 0, 1); + array_append(&conn->cmd_wait_list, &cmd, 1); + + if (array_count(&conn->cmd_send_queue) > 0 && + conn->state == IMAPC_CONNECTION_STATE_DONE) { + /* send the next command in queue */ + struct imapc_command *const *cmd2_p = + array_idx(&conn->cmd_send_queue, 0); + imapc_command_send_more(conn, *cmd2_p); + } +} + +static int imapc_command_try_send_stream(struct imapc_connection *conn, + struct imapc_command *cmd) +{ + struct imapc_command_stream *stream; + + if (!array_is_created(&cmd->streams) || array_count(&cmd->streams) == 0) + return -1; + + stream = array_idx_modifiable(&cmd->streams, 0); + if (stream->pos != cmd->send_pos) + return -1; + + /* we're sending the stream now */ + (void)o_stream_send_istream(conn->output, stream->input); + if (!i_stream_is_eof(stream->input)) + return 0; + + /* finished with the stream */ + i_stream_unref(&stream->input); + array_delete(&cmd->streams, 0, 1); + + i_assert(cmd->send_pos != cmd->data->used); + return 1; +} + static void imapc_command_send_more(struct imapc_connection *conn, struct imapc_command *cmd) { const unsigned char *p; unsigned int seek_pos, start_pos, end_pos, size; + int ret; i_assert(cmd->send_pos < cmd->data->used); + if ((ret = imapc_command_try_send_stream(conn, cmd)) == 0) + return; + seek_pos = cmd->send_pos; - if (seek_pos != 0) { + if (seek_pos != 0 && ret < 0) { /* skip over the literal. we can also get here from AUTHENTICATE command, which doesn't use a literal */ if (parse_sync_literal(cmd->data->data, seek_pos, &size)) { @@ -872,18 +937,9 @@ cmd->send_pos = end_pos; if (cmd->send_pos == cmd->data->used) { - /* everything sent. move command to wait list. */ - i_assert(*array_idx(&conn->cmd_send_queue, 0) == cmd); - array_delete(&conn->cmd_send_queue, 0, 1); - array_append(&conn->cmd_wait_list, &cmd, 1); - - if (array_count(&conn->cmd_send_queue) > 0 && - conn->state == IMAPC_CONNECTION_STATE_DONE) { - /* send the next command in queue */ - struct imapc_command *const *cmd2_p = - array_idx(&conn->cmd_send_queue, 0); - imapc_command_send_more(conn, *cmd2_p); - } + i_assert(!array_is_created(&cmd->streams) || + array_count(&cmd->streams) == 0); + imapc_command_send_done(conn, cmd); } } @@ -959,6 +1015,22 @@ str_printfa(cmd->data, "%u", arg); break; } + case 'p': { + struct istream *input = va_arg(args, struct istream *); + struct imapc_command_stream *s; + uoff_t size; + + if (!array_is_created(&cmd->streams)) + p_array_init(&cmd->streams, cmd->pool, 2); + if (i_stream_get_size(input, TRUE, &size) < 0) + size = 0; + str_printfa(cmd->data, "{%"PRIuSIZE_T"}\r\n", size); + s = array_append_space(&cmd->streams); + s->pos = str_len(cmd->data); + s->input = input; + i_stream_ref(input); + break; + } case 's': { const char *arg = va_arg(args, const char *);
--- a/src/lib-storage/index/imapc/imapc-mailbox.c Wed Jan 26 21:47:52 2011 +0200 +++ b/src/lib-storage/index/imapc/imapc-mailbox.c Fri Jan 28 17:57:21 2011 +0200 @@ -22,6 +22,25 @@ mail_index_transaction_open_updated_view(mbox->delayed_sync_trans); } +static void +imapc_newmsgs_callback(const struct imapc_command_reply *reply, + void *context) +{ + struct imapc_mailbox *mbox = context; + + if (reply->state == IMAPC_COMMAND_STATE_OK) + ; + else if (reply->state == IMAPC_COMMAND_STATE_NO) { + imapc_copy_error_from_reply(mbox->storage, MAIL_ERROR_PARAMS, + reply); + } else { + mail_storage_set_critical(&mbox->storage->storage, + "imapc: Command failed: %s", reply->text_full); + } + if (mbox->opening) + imapc_client_stop(mbox->storage->client); +} + static void imapc_untagged_exists(const struct imapc_untagged_reply *reply, struct imapc_mailbox *mbox) { @@ -42,9 +61,8 @@ hdr = mail_index_get_header(mbox->box.view); mbox->new_msgs = TRUE; - imapc_client_mailbox_cmdf(mbox->client_box, imapc_async_stop_callback, - mbox->storage, "UID FETCH %u:* FLAGS", - hdr->next_uid); + imapc_client_mailbox_cmdf(mbox->client_box, imapc_newmsgs_callback, + mbox, "UID FETCH %u:* FLAGS", hdr->next_uid); } static void imapc_mailbox_idle_timeout(struct imapc_mailbox *mbox)
--- a/src/lib-storage/index/imapc/imapc-save.c Wed Jan 26 21:47:52 2011 +0200 +++ b/src/lib-storage/index/imapc/imapc-save.c Fri Jan 28 17:57:21 2011 +0200 @@ -1,7 +1,15 @@ -/* Copyright (c) 2007-2010 Dovecot authors, see the included COPYING file */ +/* Copyright (c) 2011 Dovecot authors, see the included COPYING file */ #include "lib.h" +#include "str.h" +#include "istream.h" +#include "istream-crlf.h" +#include "ostream.h" +#include "imap-date.h" +#include "imap-util.h" #include "index-mail.h" +#include "mail-copy.h" +#include "imapc-client.h" #include "imapc-storage.h" #include "imapc-sync.h" @@ -11,9 +19,24 @@ struct imapc_mailbox *mbox; struct mail_index_transaction *trans; + int fd; + char *temp_path; + struct istream *input; + + uint32_t dest_uid_validity; + ARRAY_TYPE(seq_range) dest_saved_uids; + unsigned int failed:1; + unsigned int finished:1; }; +struct imapc_save_cmd_context { + struct imapc_save_context *ctx; + int ret; +}; + +void imapc_transaction_save_rollback(struct mail_save_context *_ctx); + struct mail_save_context * imapc_save_alloc(struct mailbox_transaction_context *t) { @@ -27,6 +50,7 @@ ctx->ctx.transaction = t; ctx->mbox = mbox; ctx->trans = t->itrans; + ctx->fd = -1; t->save_ctx = &ctx->ctx; } return t->save_ctx; @@ -34,20 +58,174 @@ int imapc_save_begin(struct mail_save_context *_ctx, struct istream *input) { - return -1; + struct imapc_save_context *ctx = (struct imapc_save_context *)_ctx; + struct mail_storage *storage = _ctx->transaction->box->storage; + const char *path; + + i_assert(ctx->fd == -1); + + ctx->fd = imapc_create_temp_fd(storage->user, &path); + if (ctx->fd == -1) { + mail_storage_set_critical(storage, + "Couldn't create temp file %s", path); + ctx->failed = TRUE; + return -1; + } + /* we may not know the size of the input, or be sure that it contains + only CRLFs. so we'll always first write the mail to a temp file and + upload it from there to remote server. */ + ctx->finished = FALSE; + ctx->temp_path = i_strdup(path); + ctx->input = i_stream_create_crlf(input); + _ctx->output = o_stream_create_fd_file(ctx->fd, 0, FALSE); + o_stream_cork(_ctx->output); + return 0; } int imapc_save_continue(struct mail_save_context *_ctx) { - return -1; + struct imapc_save_context *ctx = (struct imapc_save_context *)_ctx; + struct mail_storage *storage = _ctx->transaction->box->storage; + + if (ctx->failed) + return -1; + + if (o_stream_send_istream(_ctx->output, ctx->input) < 0) { + if (!mail_storage_set_error_from_errno(storage)) { + mail_storage_set_critical(storage, + "o_stream_send_istream(%s) failed: %m", + ctx->temp_path); + } + ctx->failed = TRUE; + return -1; + } + return 0; +} + +static void imapc_save_appenduid(struct imapc_save_context *ctx, + const struct imapc_command_reply *reply) +{ + const char *const *args; + uint32_t uid_validity, dest_uid; + + /* <uidvalidity> <dest uid-set> */ + args = t_strsplit(reply->resp_text_value, " "); + if (str_array_length(args) != 2) + return; + + if (str_to_uint32(args[0], &uid_validity) < 0) + return; + if (ctx->dest_uid_validity == 0) + ctx->dest_uid_validity = uid_validity; + else if (ctx->dest_uid_validity != uid_validity) + return; + + if (str_to_uint32(args[1], &dest_uid) == 0) + seq_range_array_add(&ctx->dest_saved_uids, 0, dest_uid); +} + +static void imapc_save_callback(const struct imapc_command_reply *reply, + void *context) +{ + struct imapc_save_cmd_context *ctx = context; + + if (reply->state == IMAPC_COMMAND_STATE_OK) { + if (strcasecmp(reply->resp_text_key, "APPENDUID") == 0) + imapc_save_appenduid(ctx->ctx, reply); + ctx->ret = 0; + } else if (reply->state == IMAPC_COMMAND_STATE_NO) { + imapc_copy_error_from_reply(ctx->ctx->mbox->storage, + MAIL_ERROR_PARAMS, reply); + ctx->ret = -1; + } else { + mail_storage_set_critical(&ctx->ctx->mbox->storage->storage, + "imapc: COPY failed: %s", reply->text_full); + ctx->ret = -1; + } + imapc_client_stop(ctx->ctx->mbox->storage->client); +} + +static void +imapc_append_keywords(string_t *str, struct mail_keywords *kw) +{ + const ARRAY_TYPE(keywords) *kw_arr; + const char *const *kw_p; + unsigned int i; + + kw_arr = mail_index_get_keywords(kw->index); + for (i = 0; i < kw->count; i++) { + kw_p = array_idx(kw_arr, kw->idx[i]); + if (str_len(str) > 1) + str_append_c(str, ' '); + str_append(str, *kw_p); + } +} + +static int imapc_save_append(struct imapc_save_context *ctx) +{ + struct mail_save_context *_ctx = &ctx->ctx; + struct imapc_save_cmd_context sctx; + struct istream *input; + const char *flags = "", *internaldate = ""; + + if (_ctx->flags != 0 || _ctx->keywords != NULL) { + string_t *str = t_str_new(64); + + str_append(str, " ("); + imap_write_flags(str, _ctx->flags, NULL); + if (_ctx->keywords != NULL) + imapc_append_keywords(str, _ctx->keywords); + str_append_c(str, ')'); + flags = str_c(str); + } + if (_ctx->received_date != (time_t)-1) { + internaldate = t_strdup_printf(" \"%s\"", + imap_to_datetime(_ctx->received_date)); + } + + input = i_stream_create_fd(ctx->fd, IO_BLOCK_SIZE, FALSE); + sctx.ctx = ctx; + imapc_client_cmdf(ctx->mbox->storage->client, + imapc_save_callback, &sctx, "APPEND %s%1s%1s %p", + ctx->mbox->box.name, flags, internaldate, input); + i_stream_unref(&input); + imapc_client_run(ctx->mbox->storage->client); + return sctx.ret; } int imapc_save_finish(struct mail_save_context *_ctx) { struct imapc_save_context *ctx = (struct imapc_save_context *)_ctx; + struct mail_storage *storage = _ctx->transaction->box->storage; - ctx->failed = TRUE; + ctx->finished = TRUE; + + if (!ctx->failed) { + if (o_stream_flush(_ctx->output) < 0) { + if (!mail_storage_set_error_from_errno(storage)) { + mail_storage_set_critical(storage, + "o_stream_flush(%s) failed: %m", + ctx->temp_path); + } + ctx->failed = TRUE; + } + } + if (!ctx->failed) { + if (imapc_save_append(ctx) < 0) + ctx->failed = TRUE; + } + + if (_ctx->output != NULL) + o_stream_unref(&_ctx->output); + if (ctx->input != NULL) + i_stream_unref(&ctx->input); + if (ctx->fd != -1) { + if (close(ctx->fd) < 0) + i_error("close(%s) failed: %m", ctx->temp_path); + ctx->fd = -1; + } + i_free(ctx->temp_path); index_save_context_free(_ctx); return ctx->failed ? -1 : 0; } @@ -62,14 +240,104 @@ int imapc_transaction_save_commit_pre(struct mail_save_context *_ctx) { - return -1; + struct imapc_save_context *ctx = (struct imapc_save_context *)_ctx; + struct mail_transaction_commit_changes *changes = + _ctx->transaction->changes; + + i_assert(ctx->finished); + + if (array_is_created(&ctx->dest_saved_uids)) { + changes->uid_validity = ctx->dest_uid_validity; + array_append_array(&changes->saved_uids, &ctx->dest_saved_uids); + } + return 0; } void imapc_transaction_save_commit_post(struct mail_save_context *_ctx, - struct mail_index_transaction_commit_result *result) + struct mail_index_transaction_commit_result *result ATTR_UNUSED) { + imapc_transaction_save_rollback(_ctx); } void imapc_transaction_save_rollback(struct mail_save_context *_ctx) { + struct imapc_save_context *ctx = (struct imapc_save_context *)_ctx; + + /* FIXME: if we really want to rollback, we should expunge messages + we already saved */ + + if (!ctx->finished) + imapc_save_cancel(_ctx); + + if (array_is_created(&ctx->dest_saved_uids)) + array_free(&ctx->dest_saved_uids); + i_free(ctx); } + +static void imapc_save_copyuid(struct imapc_save_context *ctx, + const struct imapc_command_reply *reply) +{ + const char *const *args; + uint32_t uid_validity, dest_uid; + + /* <uidvalidity> <source uid-set> <dest uid-set> */ + args = t_strsplit(reply->resp_text_value, " "); + if (str_array_length(args) != 3) + return; + + if (str_to_uint32(args[0], &uid_validity) < 0) + return; + if (ctx->dest_uid_validity == 0) + ctx->dest_uid_validity = uid_validity; + else if (ctx->dest_uid_validity != uid_validity) + return; + + if (str_to_uint32(args[2], &dest_uid) == 0) + seq_range_array_add(&ctx->dest_saved_uids, 0, dest_uid); +} + +static void imapc_copy_callback(const struct imapc_command_reply *reply, + void *context) +{ + struct imapc_save_cmd_context *ctx = context; + + if (reply->state == IMAPC_COMMAND_STATE_OK) { + if (strcasecmp(reply->resp_text_key, "COPYUID") == 0) + imapc_save_copyuid(ctx->ctx, reply); + ctx->ret = 0; + } else if (reply->state == IMAPC_COMMAND_STATE_NO) { + imapc_copy_error_from_reply(ctx->ctx->mbox->storage, + MAIL_ERROR_PARAMS, reply); + ctx->ret = -1; + } else { + mail_storage_set_critical(&ctx->ctx->mbox->storage->storage, + "imapc: COPY failed: %s", reply->text_full); + ctx->ret = -1; + } + imapc_client_stop(ctx->ctx->mbox->storage->client); +} + +int imapc_copy(struct mail_save_context *_ctx, struct mail *mail) +{ + struct imapc_save_context *ctx = (struct imapc_save_context *)_ctx; + struct mailbox_transaction_context *_t = _ctx->transaction; + struct imapc_mailbox *src_mbox = (struct imapc_mailbox *)mail->box; + struct imapc_save_cmd_context sctx; + + i_assert((_t->flags & MAILBOX_TRANSACTION_FLAG_EXTERNAL) != 0); + + if (_t->box->storage == mail->box->storage) { + /* same server, we can use COPY for the mail */ + sctx.ret = -2; + sctx.ctx = ctx; + imapc_client_mailbox_cmdf(src_mbox->client_box, + imapc_copy_callback, &sctx, + "UID COPY %u %s", + mail->uid, _t->box->name); + imapc_client_run(src_mbox->storage->client); + i_assert(sctx.ret != -2); + ctx->finished = TRUE; + return sctx.ret; + } + return mail_storage_copy(_ctx, mail); +}
--- a/src/lib-storage/index/imapc/imapc-search.c Wed Jan 26 21:47:52 2011 +0200 +++ b/src/lib-storage/index/imapc/imapc-search.c Fri Jan 28 17:57:21 2011 +0200 @@ -2,7 +2,6 @@ #include "lib.h" #include "istream.h" -#include "safe-mkstemp.h" #include "write-full.h" #include "str.h" #include "imap-arg.h" @@ -74,30 +73,6 @@ return TRUE; } -static int create_temp_fd(struct mail_user *user, const char **path_r) -{ - string_t *path; - int fd; - - path = t_str_new(128); - mail_user_set_get_temp_prefix(path, user->set); - fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1); - if (fd == -1) { - i_error("safe_mkstemp(%s) failed: %m", str_c(path)); - return -1; - } - - /* we just want the fd, unlink it */ - if (unlink(str_c(path)) < 0) { - /* shouldn't happen.. */ - i_error("unlink(%s) failed: %m", str_c(path)); - (void)close(fd); - return -1; - } - *path_r = str_c(path); - return fd; -} - static void imapc_fetch_stream(struct index_mail *imail, const char *value, bool body) { @@ -111,7 +86,7 @@ if (imail->data.stream != NULL) return; - fd = create_temp_fd(_mail->box->storage->user, &path); + fd = imapc_create_temp_fd(_mail->box->storage->user, &path); if (fd == -1) return; if (write_full(fd, value, value_len) < 0) {
--- a/src/lib-storage/index/imapc/imapc-storage.c Wed Jan 26 21:47:52 2011 +0200 +++ b/src/lib-storage/index/imapc/imapc-storage.c Fri Jan 28 17:57:21 2011 +0200 @@ -3,9 +3,9 @@ #include "lib.h" #include "ioloop.h" #include "str.h" +#include "safe-mkstemp.h" #include "imap-arg.h" #include "imap-resp-code.h" -#include "mail-copy.h" #include "index-mail.h" #include "imapc-client.h" #include "imapc-list.h" @@ -79,10 +79,9 @@ return &storage->storage; } -static void -imapc_copy_error_from_reply(struct imapc_storage *storage, - enum mail_error default_error, - const struct imapc_command_reply *reply) +void imapc_copy_error_from_reply(struct imapc_storage *storage, + enum mail_error default_error, + const struct imapc_command_reply *reply) { enum mail_error error; @@ -294,11 +293,12 @@ if (index_storage_mailbox_open(box, FALSE) < 0) return -1; - if (box->deleting) { + if (box->deleting || (box->flags & MAILBOX_FLAG_SAVEONLY) != 0) { /* We don't actually want to SELECT the mailbox. */ return 0; } + mbox->opening = TRUE; ctx.mbox = mbox; ctx.ret = -1; mbox->client_box = @@ -306,6 +306,7 @@ imapc_mailbox_open_callback, &ctx, mbox); imapc_client_run(mbox->storage->client); + mbox->opening = FALSE; if (ctx.ret < 0) { mailbox_close(box); return -1; @@ -476,6 +477,30 @@ /* we're doing IDLE all the time anyway - nothing to do here */ } +int imapc_create_temp_fd(struct mail_user *user, const char **path_r) +{ + string_t *path; + int fd; + + path = t_str_new(128); + mail_user_set_get_temp_prefix(path, user->set); + fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1); + if (fd == -1) { + i_error("safe_mkstemp(%s) failed: %m", str_c(path)); + return -1; + } + + /* we just want the fd, unlink it */ + if (unlink(str_c(path)) < 0) { + /* shouldn't happen.. */ + i_error("unlink(%s) failed: %m", str_c(path)); + (void)close(fd); + return -1; + } + *path_r = str_c(path); + return fd; +} + struct mail_storage imapc_storage = { .name = IMAPC_STORAGE_NAME, .class_flags = 0, @@ -529,7 +554,7 @@ imapc_save_continue, imapc_save_finish, imapc_save_cancel, - mail_storage_copy, + imapc_copy, index_storage_is_inconsistent } };
--- a/src/lib-storage/index/imapc/imapc-storage.h Wed Jan 26 21:47:52 2011 +0200 +++ b/src/lib-storage/index/imapc/imapc-storage.h Fri Jan 28 17:57:21 2011 +0200 @@ -51,6 +51,7 @@ ARRAY_DEFINE(untagged_callbacks, struct imapc_mailbox_event_callback); ARRAY_DEFINE(resp_text_callbacks, struct imapc_mailbox_event_callback); + unsigned int opening:1; unsigned int new_msgs:1; }; @@ -67,6 +68,7 @@ int imapc_save_continue(struct mail_save_context *ctx); int imapc_save_finish(struct mail_save_context *ctx); void imapc_save_cancel(struct mail_save_context *ctx); +int imapc_copy(struct mail_save_context *ctx, struct mail *mail); int imapc_transaction_save_commit_pre(struct mail_save_context *ctx); void imapc_transaction_save_commit_post(struct mail_save_context *ctx, @@ -82,6 +84,9 @@ struct mail *mail, bool *tryagain_r); void imapc_fetch_mail_update(struct mail *mail, const struct imap_arg *args); +void imapc_copy_error_from_reply(struct imapc_storage *storage, + enum mail_error default_error, + const struct imapc_command_reply *reply); void imapc_simple_callback(const struct imapc_command_reply *reply, void *context); void imapc_async_stop_callback(const struct imapc_command_reply *reply, @@ -98,5 +103,6 @@ imapc_mailbox_callback_t *callback); void imapc_mailbox_register_callbacks(struct imapc_mailbox *mbox); +int imapc_create_temp_fd(struct mail_user *user, const char **path_r); #endif