Mercurial > dovecot > core-2.2
changeset 4266:662578b5ae22 HEAD
Added tee-istream, which can be used to create multiple readable input streams from one input stream.
author | Timo Sirainen <timo.sirainen@movial.fi> |
---|---|
date | Tue, 09 May 2006 12:39:00 +0300 |
parents | 75d5843153f1 |
children | bd99e8f5e3ac |
files | src/lib/Makefile.am src/lib/istream-tee.c src/lib/istream-tee.h |
diffstat | 3 files changed, 206 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lib/Makefile.am Tue May 09 12:37:40 2006 +0300 +++ b/src/lib/Makefile.am Tue May 09 12:39:00 2006 +0300 @@ -30,6 +30,7 @@ istream-limit.c \ istream-mmap.c \ istream-seekable.c \ + istream-tee.c \ ioloop.c \ ioloop-notify-none.c \ ioloop-notify-dn.c \ @@ -111,6 +112,7 @@ istream.h \ istream-internal.h \ istream-seekable.h \ + istream-tee.h \ ioloop.h \ ioloop-internal.h \ lib.h \
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/istream-tee.c Tue May 09 12:39:00 2006 +0300 @@ -0,0 +1,189 @@ +/* Copyright (C) 2006 Timo Sirainen */ + +#include "lib.h" +#include "istream-internal.h" +#include "istream-tee.h" + +struct tee_istream { + pool_t pool; + struct istream *input; + struct tee_child_istream *children; +}; + +struct tee_child_istream { + struct _istream istream; + + struct tee_istream *tee; + struct tee_child_istream *next; +}; + +static void tee_streams_update_buffer(struct tee_istream *tee) +{ + struct tee_child_istream *tstream = tee->children; + const unsigned char *data; + size_t size, old_used; + + data = i_stream_get_data(tee->input, &size); + for (; tstream != NULL; tstream = tstream->next) { + old_used = tstream->istream.pos - tstream->istream.skip; + + tstream->istream.buffer = data; + tstream->istream.skip = tstream->istream.istream.v_offset - + tee->input->v_offset; + i_assert(tstream->istream.skip + old_used <= size); + tstream->istream.pos = tstream->istream.skip + old_used; + } +} + +static void tee_streams_skip(struct tee_istream *tee) +{ + struct tee_child_istream *tstream = tee->children; + size_t min_skip; + + min_skip = (size_t)-1; + for (; tstream != NULL; tstream = tstream->next) { + if (tstream->istream.skip < min_skip) + min_skip = tstream->istream.skip; + } + + if (min_skip > 0) { + i_stream_skip(tee->input, min_skip); + tee_streams_update_buffer(tee); + } +} + +static void _close(struct _iostream *stream) +{ + struct tee_child_istream *tstream = (struct tee_child_istream *)stream; + + tee_streams_skip(tstream->tee); +} + +static void _destroy(struct _iostream *stream) +{ + struct tee_child_istream *tstream = (struct tee_child_istream *)stream; + struct tee_istream *tee = tstream->tee; + struct tee_child_istream **p; + + for (p = &tee->children; *p != NULL; p = &(*p)->next) { + if (*p == tstream) { + *p = tstream->next; + break; + } + } + + if (tee->children == NULL) { + i_stream_unref(&tee->input); + p_free(tee->pool, tee); + } +} + +static void _set_max_buffer_size(struct _iostream *stream, size_t max_size) +{ + struct tee_child_istream *tstream = (struct tee_child_istream *)stream; + + return i_stream_set_max_buffer_size(tstream->tee->input, max_size); +} + +static ssize_t _read(struct _istream *stream) +{ + struct tee_child_istream *tstream = (struct tee_child_istream *)stream; + struct istream *input = tstream->tee->input; + const unsigned char *data; + size_t size; + uoff_t last_high_offset; + ssize_t ret; + + data = i_stream_get_data(input, &size); + + last_high_offset = stream->istream.v_offset + + (tstream->istream.pos - tstream->istream.skip); + i_assert(last_high_offset <= input->v_offset + size); + if (last_high_offset == input->v_offset + size) { + tee_streams_skip(tstream->tee); + ret = i_stream_read(input); + if (ret <= 0) { + data = i_stream_get_data(input, &size); + if (ret == -2 && stream->skip != 0) { + /* someone else is holding the data, + wait for it */ + return 0; + } + stream->istream.eof = input->eof; + return ret; + } + tee_streams_update_buffer(tstream->tee); + data = i_stream_get_data(input, &size); + } else if (stream->buffer == NULL) { + tee_streams_update_buffer(tstream->tee); + } + + i_assert(stream->buffer == data); + ret = size - stream->pos; + stream->pos = size; + return ret; +} + +static void _seek(struct _istream *stream __attr_unused__, + uoff_t v_offset __attr_unused__, + bool mark __attr_unused__) +{ + i_panic("tee-istream: seeking unsupported currently"); +} + +static const struct stat *_stat(struct _istream *stream, bool exact) +{ + struct tee_child_istream *tstream = (struct tee_child_istream *)stream; + + return i_stream_stat(tstream->tee->input, exact); +} + +static void _sync(struct _istream *stream) +{ + struct tee_child_istream *tstream = (struct tee_child_istream *)stream; + size_t size; + + tee_streams_skip(tstream->tee); + (void)i_stream_get_data(tstream->tee->input, &size); + if (size != 0) { + i_panic("tee-istream: i_stream_sync() called " + "with data still buffered"); + } + return i_stream_sync(tstream->tee->input); +} + +struct tee_istream *tee_i_stream_create(struct istream *input, pool_t pool) +{ + struct tee_istream *tee; + + tee = p_new(pool, struct tee_istream, 1); + tee->pool = pool; + tee->input = input; + + i_stream_ref(input); + return tee; +} + +struct istream * +tee_i_stream_create_child(struct tee_istream *tee, pool_t pool) +{ + struct tee_child_istream *tstream; + + tstream = p_new(pool, struct tee_child_istream, 1); + tstream->tee = tee; + + tstream->istream.iostream.close = _close; + tstream->istream.iostream.destroy = _destroy; + tstream->istream.iostream.set_max_buffer_size = _set_max_buffer_size; + + tstream->istream.read = _read; + tstream->istream.seek = _seek; + tstream->istream.stat = _stat; + tstream->istream.sync = _sync; + + tstream->next = tee->children; + tee->children = tstream; + + return _i_stream_create(&tstream->istream, pool, + i_stream_get_fd(tee->input), 0); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/istream-tee.h Tue May 09 12:39:00 2006 +0300 @@ -0,0 +1,15 @@ +#ifndef __ISTREAM_TEE_H +#define __ISTREAM_TEE_H + +/* Tee can be used to create multiple child input streams which can access + a single non-blocking input stream in a way that data isn't removed from + memory until all child streams have consumed the input. + + If the stream's buffer gets full because some child isn't consuming the + data, other streams get returned 0 by i_stream_read(). */ +struct tee_istream *tee_i_stream_create(struct istream *input, pool_t pool); + +struct istream * +tee_i_stream_create_child(struct tee_istream *tee, pool_t pool); + +#endif