Mercurial > dovecot > original-hg > dovecot-1.2
changeset 6561:026f67ecd858 HEAD
Added concatenation input stream.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Sat, 20 Oct 2007 19:16:25 +0300 |
parents | 9036aa956d14 |
children | 8917cf7fa8ba |
files | src/lib/Makefile.am src/lib/istream-concat.c src/lib/istream-concat.h |
diffstat | 3 files changed, 258 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lib/Makefile.am Sat Oct 20 19:15:59 2007 +0300 +++ b/src/lib/Makefile.am Sat Oct 20 19:16:25 2007 +0300 @@ -36,6 +36,7 @@ imem.c \ iostream.c \ istream.c \ + istream-concat.c \ istream-crlf.c \ istream-data.c \ istream-file.c \ @@ -131,6 +132,7 @@ imem.h \ iostream-internal.h \ istream.h \ + istream-concat.h \ istream-crlf.h \ istream-internal.h \ istream-seekable.h \
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/istream-concat.c Sat Oct 20 19:16:25 2007 +0300 @@ -0,0 +1,249 @@ +/* Copyright (c) 2007 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "buffer.h" +#include "istream-internal.h" +#include "istream-concat.h" + +struct concat_istream { + struct istream_private istream; + + struct istream **input, *cur_input; + uoff_t *input_size; + + unsigned int cur_idx, unknown_size_idx; + size_t prev_size; +}; + +static void i_stream_concat_close(struct iostream_private *stream) +{ + struct concat_istream *cstream = (struct concat_istream *)stream; + unsigned int i; + + for (i = 0; cstream->input[i] != NULL; i++) + i_stream_close(cstream->input[i]); +} + +static void i_stream_concat_destroy(struct iostream_private *stream) +{ + struct concat_istream *cstream = (struct concat_istream *)stream; + unsigned int i; + + for (i = 0; cstream->input[i] != NULL; i++) + i_stream_unref(&cstream->input[i]); +} + +static void +i_stream_concat_set_max_buffer_size(struct iostream_private *stream, + size_t max_size) +{ + struct concat_istream *cstream = (struct concat_istream *)stream; + unsigned int i; + + cstream->istream.max_buffer_size = max_size; + for (i = 0; cstream->input[i] != NULL; i++) + i_stream_set_max_buffer_size(cstream->input[i], max_size); +} + +static void i_stream_concat_read_next(struct concat_istream *cstream) +{ + const unsigned char *data; + size_t data_size, size; + + i_assert(cstream->cur_input->eof); + + cstream->cur_idx++; + cstream->cur_input = cstream->input[cstream->cur_idx]; + + if (cstream->istream.pos == cstream->istream.skip) + return; + + /* we need to keep the current data */ + data = cstream->istream.buffer + cstream->istream.skip; + data_size = cstream->istream.pos - cstream->istream.skip; + + /* we already verified that the data size is less than the + maximum buffer size */ + if (!i_stream_get_buffer_space(&cstream->istream, data_size, &size)) + i_unreached(); + i_assert(size >= data_size); + + cstream->prev_size = data_size; + memcpy(cstream->istream.w_buffer, data, data_size); +} + +static ssize_t i_stream_concat_read(struct istream_private *stream) +{ + struct concat_istream *cstream = (struct concat_istream *)stream; + const unsigned char *data; + size_t size, pos, skip; + ssize_t ret; + bool last_stream; + + if (cstream->cur_input == NULL) + return -1; + + skip = stream->skip; + if (cstream->prev_size > 0) { + if (stream->skip < cstream->prev_size) { + cstream->prev_size -= stream->skip; + skip = 0; + } else { + /* we don't need the buffer anymore */ + skip -= cstream->prev_size; + stream->skip -= cstream->prev_size; + cstream->prev_size = 0; + + i_free_and_null(stream->w_buffer); + stream->buffer = NULL; + stream->buffer_size = 0; + } + } + i_stream_skip(cstream->cur_input, skip); + + data = i_stream_get_data(cstream->cur_input, &pos); + if (pos > stream->pos) + ret = 0; + else { + /* need to read more */ + ret = i_stream_read(cstream->cur_input); + if (ret == -2 || ret == 0) + return ret; + + if (ret == -1 && stream->istream.stream_errno != 0) { + stream->istream.stream_errno = + cstream->cur_input->stream_errno; + return -1; + } + + /* we either read something or we're at EOF */ + last_stream = cstream->input[cstream->cur_idx+1] == NULL; + if (ret == -1 && !last_stream) { + if (stream->pos - stream->skip >= + stream->max_buffer_size) + return -2; + + i_stream_concat_read_next(cstream); + last_stream = + cstream->input[cstream->cur_idx+1] == NULL; + } + + stream->istream.eof = cstream->cur_input->eof && last_stream; + data = i_stream_get_data(cstream->cur_input, &pos); + } + + if (stream->w_buffer == NULL) { + stream->buffer = data; + stream->pos -= stream->skip; + stream->skip = 0; + } else { + if (!i_stream_get_buffer_space(stream, pos, &size)) + return -2; + memcpy(stream->w_buffer + stream->pos, data, I_MIN(size, pos)); + } + + ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) : + (ret == 0 ? 0 : -1); + stream->pos = pos; + return ret; +} + +static unsigned int +find_v_offset(struct concat_istream *cstream, uoff_t *v_offset) +{ + const struct stat *st; + unsigned int i; + + for (i = 0; cstream->input[i] != NULL; i++) { + if (*v_offset == 0) { + /* seek to beginning of this stream */ + break; + } + if (i == cstream->unknown_size_idx) { + /* we'll need to figure out this stream's size */ + st = i_stream_stat(cstream->input[i], TRUE); + if (st == NULL) { + cstream->istream.istream.stream_errno = + cstream->input[i]->stream_errno; + return (unsigned int)-1; + } + + /* @UNSAFE */ + cstream->input_size[i] = st->st_size; + cstream->unknown_size_idx = i + 1; + } + if (*v_offset < cstream->input_size[i]) + break; + *v_offset -= cstream->input_size[i]; + } + + return i; +} + +static void i_stream_concat_seek(struct istream_private *stream, + uoff_t v_offset, bool mark ATTR_UNUSED) +{ + struct concat_istream *cstream = (struct concat_istream *)stream; + + stream->istream.stream_errno = 0; + stream->istream.v_offset = v_offset; + stream->skip = stream->pos = 0; + + cstream->cur_idx = find_v_offset(cstream, &v_offset); + if (cstream->cur_idx == (unsigned int)-1) { + cstream->cur_input = NULL; + return; + } + cstream->cur_input = cstream->input[cstream->cur_idx]; + i_stream_seek(cstream->cur_input, v_offset); +} + +static const struct stat * +i_stream_concat_stat(struct istream_private *stream, bool exact ATTR_UNUSED) +{ + struct concat_istream *cstream = (struct concat_istream *)stream; + uoff_t v_offset = (uoff_t)-1; + unsigned int i; + + /* make sure we have all sizes */ + (void)find_v_offset(cstream, &v_offset); + + stream->statbuf.st_size = 0; + for (i = 0; i < cstream->unknown_size_idx; i++) + stream->statbuf.st_size += cstream->input_size[i]; + return &stream->statbuf; +} + +struct istream *i_stream_create_concat(struct istream *input[]) +{ + struct concat_istream *cstream; + unsigned int count; + size_t max_buffer_size = I_STREAM_MIN_SIZE; + + for (count = 0; input[count] != NULL; count++) { + size_t cur_max = input[count]->real_stream->max_buffer_size; + if (cur_max > max_buffer_size) + max_buffer_size = cur_max; + i_stream_ref(input[count]); + } + i_assert(count != 0); + + cstream = i_new(struct concat_istream, 1); + cstream->input = i_new(struct istream *, count + 1); + cstream->input_size = i_new(uoff_t, count + 1); + + memcpy(cstream->input, input, sizeof(*input) * count); + cstream->cur_input = cstream->input[0]; + + cstream->istream.iostream.close = i_stream_concat_close; + cstream->istream.iostream.destroy = i_stream_concat_destroy; + cstream->istream.iostream.set_max_buffer_size = + i_stream_concat_set_max_buffer_size; + + cstream->istream.max_buffer_size = max_buffer_size; + cstream->istream.read = i_stream_concat_read; + cstream->istream.seek = i_stream_concat_seek; + cstream->istream.stat = i_stream_concat_stat; + + return i_stream_create(&cstream->istream, -1, 0); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/istream-concat.h Sat Oct 20 19:16:25 2007 +0300 @@ -0,0 +1,7 @@ +#ifndef ISTREAM_CONCAT_H +#define ISTREAM_CONCAT_H + +/* Concatenate input streams into a single stream. */ +struct istream *i_stream_create_concat(struct istream *input[]); + +#endif