Mercurial > dovecot > core-2.2
diff src/doveadm/dsync/dsync-ibc-stream.c @ 16052:0e5a359b7b7f
lib-storage: Mailbox attributes can now be accessed via istreams.
The idea is to use istreams for larger values.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Tue, 19 Mar 2013 19:05:27 +0200 |
parents | 46b223892373 |
children | acb88f199704 |
line wrap: on
line diff
--- a/src/doveadm/dsync/dsync-ibc-stream.c Tue Mar 19 18:52:39 2013 +0200 +++ b/src/doveadm/dsync/dsync-ibc-stream.c Tue Mar 19 19:05:27 2013 +0200 @@ -99,7 +99,7 @@ { .name = "mailbox_attribute", .chr = 'A', .required_keys = "type key", - .optional_keys = "value deleted last_change modseq" + .optional_keys = "value stream deleted last_change modseq" }, { .name = "mail_change", .chr = 'C', @@ -142,9 +142,10 @@ pool_t ret_pool; struct dsync_deserializer_decoder *cur_decoder; - struct istream *mail_output, *mail_input; + struct istream *value_output, *value_input; struct dsync_mail *cur_mail; - char mail_output_last; + struct dsync_mailbox_attribute *cur_attr; + char value_output_last; unsigned int version_received:1; unsigned int handshake_received:1; @@ -163,21 +164,21 @@ static int dsync_ibc_stream_read_mail_stream(struct dsync_ibc_stream *ibc) { do { - i_stream_skip(ibc->mail_input, - i_stream_get_data_size(ibc->mail_input)); - } while (i_stream_read(ibc->mail_input) > 0); - if (ibc->mail_input->eof) { - if (ibc->mail_input->stream_errno != 0) { - errno = ibc->mail_input->stream_errno; + i_stream_skip(ibc->value_input, + i_stream_get_data_size(ibc->value_input)); + } while (i_stream_read(ibc->value_input) > 0); + if (ibc->value_input->eof) { + if (ibc->value_input->stream_errno != 0) { + errno = ibc->value_input->stream_errno; i_error("dsync(%s): read() failed: %m", ibc->name); dsync_ibc_stream_stop(ibc); return -1; } /* finished reading the mail stream */ - i_assert(ibc->mail_input->eof); - i_stream_seek(ibc->mail_input, 0); + i_assert(ibc->value_input->eof); + i_stream_seek(ibc->value_input, 0); ibc->has_pending_data = TRUE; - ibc->mail_input = NULL; + ibc->value_input = NULL; return 1; } return 0; @@ -185,7 +186,7 @@ static void dsync_ibc_stream_input(struct dsync_ibc_stream *ibc) { - if (ibc->mail_input != NULL) { + if (ibc->value_input != NULL) { if (dsync_ibc_stream_read_mail_stream(ibc) == 0) return; } @@ -194,19 +195,19 @@ o_stream_uncork(ibc->output); } -static int dsync_ibc_stream_send_mail_stream(struct dsync_ibc_stream *ibc) +static int dsync_ibc_stream_send_value_stream(struct dsync_ibc_stream *ibc) { const unsigned char *data; unsigned char add; size_t i, size; int ret; - while ((ret = i_stream_read_data(ibc->mail_output, + while ((ret = i_stream_read_data(ibc->value_output, &data, &size, 0)) > 0) { add = '\0'; for (i = 0; i < size; i++) { if (data[i] == '.' && - ((i == 0 && ibc->mail_output_last == '\n') || + ((i == 0 && ibc->value_output_last == '\n') || (i > 0 && data[i-1] == '\n'))) { /* escape the dot */ add = '.'; @@ -216,8 +217,8 @@ if (i > 0) { o_stream_nsend(ibc->output, data, i); - ibc->mail_output_last = data[i-1]; - i_stream_skip(ibc->mail_output, i); + ibc->value_output_last = data[i-1]; + i_stream_skip(ibc->value_output, i); } if (o_stream_get_buffer_used_size(ibc->output) >= 4096) { @@ -234,14 +235,14 @@ if (add != '\0') { o_stream_nsend(ibc->output, &add, 1); - ibc->mail_output_last = add; + ibc->value_output_last = add; } } i_assert(ret == -1); - if (ibc->mail_output->stream_errno != 0) { + if (ibc->value_output->stream_errno != 0) { i_error("dsync(%s): read(%s) failed: %m", - ibc->name, i_stream_get_name(ibc->mail_output)); + ibc->name, i_stream_get_name(ibc->value_output)); dsync_ibc_stream_stop(ibc); return -1; } @@ -249,7 +250,7 @@ /* finished sending the stream. use "CRLF." instead of "LF." just in case we're sending binary data that ends with CR. */ o_stream_nsend_str(ibc->output, "\r\n.\r\n"); - i_stream_unref(&ibc->mail_output); + i_stream_unref(&ibc->value_output); return 1; } @@ -261,8 +262,8 @@ o_stream_cork(ibc->output); if ((ret = o_stream_flush(output)) < 0) ret = 1; - else if (ibc->mail_output != NULL) { - if (dsync_ibc_stream_send_mail_stream(ibc) < 0) + else if (ibc->value_output != NULL) { + if (dsync_ibc_stream_send_value_stream(ibc) < 0) ret = 1; } timeout_reset(ibc->to); @@ -320,8 +321,8 @@ if (ibc->cur_decoder != NULL) dsync_deserializer_decode_finish(&ibc->cur_decoder); - if (ibc->mail_output != NULL) - i_stream_unref(&ibc->mail_output); + if (ibc->value_output != NULL) + i_stream_unref(&ibc->value_output); else { /* notify remote that we're closing. this is mainly to avoid "read() failed: EOF" errors on failing dsyncs */ @@ -399,10 +400,50 @@ dsync_ibc_stream_send_string(struct dsync_ibc_stream *ibc, const string_t *str) { - i_assert(ibc->mail_output == NULL); + i_assert(ibc->value_output == NULL); o_stream_nsend(ibc->output, str_data(str), str_len(str)); } +static int seekable_fd_callback(const char **path_r, void *context) +{ + struct dsync_ibc_stream *ibc = context; + string_t *path; + int fd; + + path = t_str_new(128); + str_append(path, ibc->temp_path_prefix); + fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1); + if (fd == -1) { + i_error("safe_mkstemp(%s) failed: %m", str_c(path)); + return -1; + } + + /* we just want the fd, unlink it */ + if (unlink(str_c(path)) < 0) { + /* shouldn't happen.. */ + i_error("unlink(%s) failed: %m", str_c(path)); + i_close_fd(&fd); + return -1; + } + + *path_r = str_c(path); + return fd; +} + +static struct istream * +dsync_ibc_stream_input_stream(struct dsync_ibc_stream *ibc) +{ + struct istream *inputs[2]; + + inputs[0] = i_stream_create_dot(ibc->input, FALSE); + inputs[1] = NULL; + ibc->value_input = i_stream_create_seekable(inputs, MAIL_READ_FULL_BLOCK_SIZE, + seekable_fd_callback, ibc); + i_stream_unref(&inputs[0]); + + return ibc->value_input; +} + static int dsync_ibc_check_missing_deserializers(struct dsync_ibc_stream *ibc) { @@ -483,7 +524,7 @@ const char *line, *error; unsigned int i; - i_assert(ibc->mail_input == NULL); + i_assert(ibc->value_input == NULL); timeout_reset(ibc->to); @@ -659,7 +700,7 @@ { struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; - i_assert(ibc->mail_output == NULL); + i_assert(ibc->value_output == NULL); o_stream_nsend_str(ibc->output, END_OF_LIST_LINE"\n"); } @@ -1218,6 +1259,8 @@ dsync_serializer_encode_add(encoder, "key", attr->key); if (attr->value != NULL) dsync_serializer_encode_add(encoder, "value", attr->value); + else if (attr->value_stream != NULL) + dsync_serializer_encode_add(encoder, "stream", ""); if (attr->deleted) dsync_serializer_encode_add(encoder, "deleted", ""); @@ -1232,6 +1275,13 @@ dsync_serializer_encode_finish(&encoder, str); dsync_ibc_stream_send_string(ibc, str); + + if (attr->value_stream != NULL) { + ibc->value_output_last = '\0'; + ibc->value_output = attr->value_stream; + i_stream_ref(ibc->value_output); + (void)dsync_ibc_stream_send_value_stream(ibc); + } } static enum dsync_ibc_recv_ret @@ -1248,6 +1298,13 @@ if (ibc->minor_version < DSYNC_PROTOCOL_MINOR_HAVE_ATTRIBUTES) return DSYNC_IBC_RECV_RET_FINISHED; + if (ibc->cur_attr != NULL) { + /* finished reading the stream, return the mail now */ + *attr_r = ibc->cur_attr; + ibc->cur_attr = NULL; + return DSYNC_IBC_RECV_RET_OK; + } + p_clear(pool); attr = p_new(pool, struct dsync_mailbox_attribute, 1); @@ -1271,7 +1328,15 @@ value = dsync_deserializer_decode_get(decoder, "key"); attr->key = p_strdup(pool, value); - if (dsync_deserializer_decode_try(decoder, "value", &value)) + if (dsync_deserializer_decode_try(decoder, "stream", &value)) { + attr->value_stream = dsync_ibc_stream_input_stream(ibc); + if (dsync_ibc_stream_read_mail_stream(ibc) <= 0) { + ibc->cur_attr = attr; + return DSYNC_IBC_RECV_RET_TRYAGAIN; + } + /* already finished reading the stream */ + i_assert(ibc->value_input == NULL); + } else if (dsync_deserializer_decode_try(decoder, "value", &value)) attr->value = p_strdup(pool, value); if (dsync_deserializer_decode_try(decoder, "deleted", &value)) attr->deleted = TRUE; @@ -1512,7 +1577,7 @@ struct dsync_serializer_encoder *encoder; string_t *str = t_str_new(128); - i_assert(ibc->mail_output == NULL); + i_assert(ibc->value_output == NULL); str_append_c(str, items[ITEM_MAIL].chr); encoder = dsync_serializer_encode_begin(ibc->serializers[ITEM_MAIL]); @@ -1539,39 +1604,13 @@ dsync_ibc_stream_send_string(ibc, str); if (mail->input != NULL) { - ibc->mail_output_last = '\0'; - ibc->mail_output = mail->input; - i_stream_ref(ibc->mail_output); - (void)dsync_ibc_stream_send_mail_stream(ibc); + ibc->value_output_last = '\0'; + ibc->value_output = mail->input; + i_stream_ref(ibc->value_output); + (void)dsync_ibc_stream_send_value_stream(ibc); } } -static int seekable_fd_callback(const char **path_r, void *context) -{ - struct dsync_ibc_stream *ibc = context; - string_t *path; - int fd; - - path = t_str_new(128); - str_append(path, ibc->temp_path_prefix); - fd = safe_mkstemp(path, 0600, (uid_t)-1, (gid_t)-1); - if (fd == -1) { - i_error("safe_mkstemp(%s) failed: %m", str_c(path)); - return -1; - } - - /* we just want the fd, unlink it */ - if (unlink(str_c(path)) < 0) { - /* shouldn't happen.. */ - i_error("unlink(%s) failed: %m", str_c(path)); - i_close_fd(&fd); - return -1; - } - - *path_r = str_c(path); - return fd; -} - static enum dsync_ibc_recv_ret dsync_ibc_stream_recv_mail(struct dsync_ibc *_ibc, struct dsync_mail **mail_r) { @@ -1579,11 +1618,10 @@ pool_t pool = ibc->ret_pool; struct dsync_deserializer_decoder *decoder; struct dsync_mail *mail; - struct istream *inputs[2]; const char *value; enum dsync_ibc_recv_ret ret; - if (ibc->mail_input != NULL) { + if (ibc->value_input != NULL) { /* wait until the mail's stream has been read */ return DSYNC_IBC_RECV_RET_TRYAGAIN; } @@ -1621,19 +1659,13 @@ return DSYNC_IBC_RECV_RET_TRYAGAIN; } if (dsync_deserializer_decode_try(decoder, "stream", &value)) { - inputs[0] = i_stream_create_dot(ibc->input, FALSE); - inputs[1] = NULL; - mail->input = i_stream_create_seekable(inputs, - MAIL_READ_FULL_BLOCK_SIZE, seekable_fd_callback, ibc); - i_stream_unref(&inputs[0]); - - ibc->mail_input = mail->input; + mail->input = dsync_ibc_stream_input_stream(ibc); if (dsync_ibc_stream_read_mail_stream(ibc) <= 0) { ibc->cur_mail = mail; return DSYNC_IBC_RECV_RET_TRYAGAIN; } /* already finished reading the stream */ - i_assert(ibc->mail_input == NULL); + i_assert(ibc->value_input == NULL); } *mail_r = mail; @@ -1644,8 +1676,8 @@ { struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; - if (ibc->mail_output != NULL) { - i_stream_unref(&ibc->mail_output); + if (ibc->value_output != NULL) { + i_stream_unref(&ibc->value_output); dsync_ibc_stream_stop(ibc); } } @@ -1655,7 +1687,7 @@ struct dsync_ibc_stream *ibc = (struct dsync_ibc_stream *)_ibc; size_t bytes; - if (ibc->mail_output != NULL) + if (ibc->value_output != NULL) return TRUE; bytes = o_stream_get_buffer_used_size(ibc->output);