# HG changeset patch # User Aki Tuomi # Date 1503386062 -10800 # Node ID 7db517071db51051d2b5fe0f869455d642838d30 # Parent 7b8e95de2bffe627e0b2ad02c70f5b52eb5d72e9 lib: Add multiplex stream support This allows having multiple channels of data in single stream. diff -r 7b8e95de2bff -r 7db517071db5 src/lib/Makefile.am --- a/src/lib/Makefile.am Wed Oct 04 15:41:03 2017 +0300 +++ b/src/lib/Makefile.am Tue Aug 22 10:14:22 2017 +0300 @@ -72,6 +72,7 @@ istream-jsonstr.c \ istream-limit.c \ istream-mmap.c \ + istream-multiplex.c \ istream-rawlog.c \ istream-seekable.c \ istream-sized.c \ @@ -114,6 +115,7 @@ ostream-failure-at.c \ ostream-file.c \ ostream-hash.c \ + ostream-multiplex.c \ ostream-null.c \ ostream-rawlog.c \ ostream-unix.c \ @@ -220,6 +222,7 @@ istream-file-private.h \ istream-hash.h \ istream-jsonstr.h \ + istream-multiplex.h \ istream-private.h \ istream-rawlog.h \ istream-seekable.h \ @@ -255,6 +258,7 @@ ostream-failure-at.h \ ostream-file-private.h \ ostream-hash.h \ + ostream-multiplex.h \ ostream-private.h \ ostream-null.h \ ostream-rawlog.h \ @@ -340,6 +344,7 @@ test-istream-concat.c \ test-istream-crlf.c \ test-istream-failure-at.c \ + test-istream-multiplex.c \ test-istream-seekable.c \ test-istream-tee.c \ test-istream-unix.c \ @@ -357,6 +362,8 @@ test-ostream-escaped.c \ test-ostream-failure-at.c \ test-ostream-file.c \ + test-ostream-multiplex.c \ + test-multiplex.c \ test-primes.c \ test-printf-format-fix.c \ test-priorityq.c \ diff -r 7b8e95de2bff -r 7db517071db5 src/lib/istream-multiplex.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/istream-multiplex.c Tue Aug 22 10:14:22 2017 +0300 @@ -0,0 +1,270 @@ +/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "array.h" +#include "istream-private.h" +#include "istream-multiplex.h" + +/* all multiplex packets are [1 byte cid][4 byte length][data] */ + +struct multiplex_istream; + +struct multiplex_ichannel { + struct istream_private istream; + struct multiplex_istream *mstream; + uint8_t cid; + size_t pending_pos; + bool closed:1; +}; + +struct multiplex_istream { + struct istream *parent; + + /* channel 0 is main channel */ + uint8_t cur_channel; + unsigned int remain; + size_t bufsize; + ARRAY(struct multiplex_ichannel *) channels; + + bool blocking:1; +}; + +static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream); + +static struct multiplex_ichannel * +get_channel(struct multiplex_istream *mstream, uint8_t cid) +{ + struct multiplex_ichannel **channelp; + i_assert(mstream != NULL); + array_foreach_modifiable(&mstream->channels, channelp) { + if (*channelp != NULL && (*channelp)->cid == cid) + return *channelp; + } + return NULL; +} + +static void propagate_error(struct multiplex_istream *mstream, int stream_errno) +{ + struct multiplex_ichannel **channelp; + array_foreach_modifiable(&mstream->channels, channelp) + if (*channelp != NULL) + (*channelp)->istream.istream.stream_errno = stream_errno; +} + +static void propagate_eof(struct multiplex_istream *mstream) +{ + struct multiplex_ichannel **channelp; + array_foreach_modifiable(&mstream->channels, channelp) { + if (*channelp != NULL) { + (*channelp)->istream.istream.eof = TRUE; + } + } +} + +static ssize_t +i_stream_multiplex_read(struct multiplex_istream *mstream, uint8_t cid) +{ + struct multiplex_ichannel *req_channel = get_channel(mstream, cid); + const unsigned char *data; + size_t len = 0, used, wanted, avail; + ssize_t ret, got = 0; + + if (mstream->parent == NULL) { + req_channel->istream.istream.eof = TRUE; + return -1; + } + + data = i_stream_get_data(mstream->parent, &len); + + if (len == 0 && mstream->parent->closed) { + req_channel->istream.istream.eof = TRUE; + return -1; + } + + if (((mstream->remain > 0 && len == 0) || + (mstream->remain == 0 && len < 5)) && + (ret = i_stream_read(mstream->parent)) <= 0) { + propagate_error(mstream, mstream->parent->stream_errno); + if (mstream->parent->eof) + propagate_eof(mstream); + return ret; + } + + for(;;) { + data = i_stream_get_data(mstream->parent, &len); + if (len == 0) { + if (got == 0 && mstream->blocking) + got += i_stream_multiplex_read(mstream, cid); + break; + } + + if (mstream->remain > 0) { + struct multiplex_ichannel *channel = + get_channel(mstream, mstream->cur_channel); + wanted = I_MIN(len, mstream->remain); + /* is it open? */ + if (channel != NULL && !channel->closed) { + struct istream_private *stream = &channel->istream; + stream->pos += channel->pending_pos; + bool alloc_ret = i_stream_try_alloc(stream, wanted, &avail); + stream->pos -= channel->pending_pos; + if (!alloc_ret) { + i_stream_set_input_pending(&stream->istream, TRUE); + if (channel->cid != cid) + return 0; + if (got > 0) + break; + return -2; + } + + used = I_MIN(wanted, avail); + + /* dump into buffer */ + if (channel->cid != cid) { + i_assert(stream->pos + channel->pending_pos + used <= stream->buffer_size); + memcpy(stream->w_buffer + stream->pos + channel->pending_pos, + data, used); + channel->pending_pos += used; + i_stream_set_input_pending(&stream->istream, TRUE); + } else { + i_assert(stream->pos + used <= stream->buffer_size); + memcpy(stream->w_buffer + stream->pos, data, used); + stream->pos += used; + got += used; + } + } else { + used = wanted; + } + mstream->remain -= used; + i_stream_skip(mstream->parent, used); + /* see if there is more to read */ + continue; + } + if (mstream->remain == 0) { + /* need more data */ + if (len < 5) { + ret = i_stream_multiplex_ichannel_read(&req_channel->istream); + if (ret > 0) + got += ret; + break; + } + /* channel ID */ + mstream->cur_channel = data[0]; + /* data length */ + mstream->remain = be32_to_cpu_unaligned(data+1); + i_stream_skip(mstream->parent, 5); + } + } + + propagate_error(mstream, mstream->parent->stream_errno); + if (mstream->parent->eof) + propagate_eof(mstream); + + return got; +} + +static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream) +{ + struct multiplex_ichannel *channel = (struct multiplex_ichannel*)stream; + /* if previous multiplex read dumped data for us + actually serve it here. */ + if (channel->pending_pos > 0) { + ssize_t ret = channel->pending_pos; + stream->pos += channel->pending_pos; + channel->pending_pos = 0; + return ret; + } + return i_stream_multiplex_read(channel->mstream, channel->cid); +} + +static void +i_stream_multiplex_ichannel_close(struct iostream_private *stream, bool close_parent) +{ + struct multiplex_ichannel *const *channelp; + struct multiplex_ichannel *channel = (struct multiplex_ichannel*)stream; + channel->closed = TRUE; + if (close_parent) { + array_foreach(&channel->mstream->channels, channelp) + if (*channelp != NULL && !(*channelp)->closed) + return; + i_stream_close(channel->mstream->parent); + } +} + +static void i_stream_multiplex_try_destroy(struct multiplex_istream *mstream) +{ + struct multiplex_ichannel **channelp; + /* can't do anything until they are all closed */ + array_foreach_modifiable(&mstream->channels, channelp) + if (*channelp != NULL) + return; + i_stream_unref(&mstream->parent); + array_free(&mstream->channels); + i_free(mstream); +} + +static void i_stream_multiplex_ichannel_destroy(struct iostream_private *stream) +{ + struct multiplex_ichannel **channelp; + struct multiplex_ichannel *channel = (struct multiplex_ichannel*)stream; + i_stream_multiplex_ichannel_close(stream, TRUE); + i_free(channel->istream.w_buffer); + array_foreach_modifiable(&channel->mstream->channels, channelp) { + if (*channelp == channel) { + *channelp = NULL; + break; + } + } + i_stream_multiplex_try_destroy(channel->mstream); +} + +static struct istream * +i_stream_add_channel_real(struct multiplex_istream *mstream, uint8_t cid) +{ + struct multiplex_ichannel *channel = i_new(struct multiplex_ichannel, 1); + channel->cid = cid; + channel->mstream = mstream; + channel->istream.read = i_stream_multiplex_ichannel_read; + channel->istream.iostream.close = i_stream_multiplex_ichannel_close; + channel->istream.iostream.destroy = i_stream_multiplex_ichannel_destroy; + channel->istream.max_buffer_size = mstream->bufsize; + channel->istream.istream.blocking = mstream->blocking; + if (cid == 0) + channel->istream.fd = i_stream_get_fd(mstream->parent); + else + channel->istream.fd = -1; + array_append(&channel->mstream->channels, &channel, 1); + + return i_stream_create(&channel->istream, NULL, channel->istream.fd); +} + +struct istream *i_stream_multiplex_add_channel(struct istream *stream, uint8_t cid) +{ + struct multiplex_ichannel *chan = + (struct multiplex_ichannel *)stream->real_stream; + i_assert(get_channel(chan->mstream, cid) == NULL); + + return i_stream_add_channel_real(chan->mstream, cid); +} + +struct istream *i_stream_create_multiplex(struct istream *parent, size_t bufsize) +{ + struct multiplex_istream *mstream; + + mstream = i_new(struct multiplex_istream, 1); + mstream->parent = parent; + mstream->bufsize = bufsize; + mstream->blocking = parent->blocking; + i_array_init(&mstream->channels, 8); + i_stream_ref(parent); + + return i_stream_add_channel_real(mstream, 0); +} + +uint8_t i_stream_multiplex_get_channel_id(struct istream *stream) +{ + struct multiplex_ichannel *channel = + (struct multiplex_ichannel *)stream->real_stream; + return channel->cid; +} diff -r 7b8e95de2bff -r 7db517071db5 src/lib/istream-multiplex.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/istream-multiplex.h Tue Aug 22 10:14:22 2017 +0300 @@ -0,0 +1,8 @@ +#ifndef ISTREAM_MULTIPLEX +#define ISTREAM_MULTIPLEX 1 + +struct istream *i_stream_create_multiplex(struct istream *parent, size_t bufsize); +struct istream *i_stream_multiplex_add_channel(struct istream *stream, uint8_t cid); +uint8_t i_stream_multiplex_get_channel_id(struct istream *stream); + +#endif diff -r 7b8e95de2bff -r 7db517071db5 src/lib/ostream-multiplex.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/ostream-multiplex.c Tue Aug 22 10:14:22 2017 +0300 @@ -0,0 +1,224 @@ +/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "array.h" +#include "ostream-private.h" +#include "ostream-multiplex.h" + +/* all multiplex packets are [1 byte cid][4 byte length][data] */ + +struct multiplex_ostream; + +struct multiplex_ochannel { + struct ostream_private ostream; + struct multiplex_ostream *mstream; + uint8_t cid; + buffer_t *buf; + time_t last_sent; + bool closed:1; +}; + +struct multiplex_ostream { + struct ostream *parent; + + /* channel 0 is main channel */ + uint8_t cur_channel; + unsigned int remain; + buffer_t *wbuf; + size_t bufsize; + ARRAY(struct multiplex_ochannel *) channels; + + bool destroyed:1; +}; + +static struct multiplex_ochannel * +get_channel(struct multiplex_ostream *mstream, uint8_t cid) +{ + struct multiplex_ochannel **channelp; + i_assert(mstream != NULL); + array_foreach_modifiable(&mstream->channels, channelp) { + if (*channelp != NULL && (*channelp)->cid == cid) + return *channelp; + } + return NULL; +} + +static void propagate_error(struct multiplex_ostream *mstream, int stream_errno) +{ + struct multiplex_ochannel **channelp; + array_foreach_modifiable(&mstream->channels, channelp) + if (*channelp != NULL) + (*channelp)->ostream.ostream.stream_errno = stream_errno; +} + +static struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream) +{ + time_t oldest = ioloop_time; + struct multiplex_ochannel *channel = NULL; + struct multiplex_ochannel **channelp; + array_foreach_modifiable(&mstream->channels, channelp) + if (*channelp != NULL && (*channelp)->last_sent <= oldest && + (*channelp)->buf->used > 0) + channel = *channelp; + return channel; +} + +static ssize_t +o_stream_multiplex_sendv(struct multiplex_ostream *mstream) +{ + struct multiplex_ochannel *channel; + ssize_t ret = 0; + if (mstream->bufsize <= mstream->wbuf->used + 5) + return -2; + + while((channel = get_next_channel(mstream)) != NULL) { + size_t tmp = mstream->bufsize - mstream->wbuf->used - 5; + /* ensure it fits into 32 bit int */ + size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used)); + if (tmp == 0) + break; + uint32_t len = cpu32_to_be(amt); + buffer_append(mstream->wbuf, &channel->cid, 1); + buffer_append(mstream->wbuf, &len, 4); + buffer_append(mstream->wbuf, channel->buf->data, amt); + buffer_delete(channel->buf, 0, amt); + channel->last_sent = ioloop_time; + } + + if (mstream->wbuf->used > 0) { + ret = o_stream_send(mstream->parent, mstream->wbuf->data, + mstream->wbuf->used); + if (ret < 0) { + propagate_error(mstream, mstream->parent->stream_errno); + return ret; + } + o_stream_flush(mstream->parent); + buffer_delete(mstream->wbuf, 0, ret); + } + return ret; +} + +static ssize_t +o_stream_multiplex_ochannel_sendv(struct ostream_private *stream, + const struct const_iovec *iov, unsigned int iov_count) +{ + struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream; + ssize_t ret; + size_t total = 0; + if (channel->mstream->bufsize <= channel->buf->used) + return -2; + + for(unsigned int i=0; i < iov_count; i++) { + /* copy data to buffer */ + size_t tmp = channel->mstream->bufsize - channel->buf->used; + if (tmp == 0) + break; + buffer_append(channel->buf, iov[i].iov_base, + I_MIN(tmp, iov[i].iov_len)); + total += I_MIN(tmp, iov[i].iov_len); + } + + stream->ostream.offset += total; + + if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0) + return ret; + + return total; +} + +static void +o_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent) +{ + struct multiplex_ochannel *const *channelp; + struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream; + o_stream_flush(&channel->ostream.ostream); + + channel->closed = TRUE; + if (close_parent) { + array_foreach(&channel->mstream->channels, channelp) + if (*channelp !=NULL && !(*channelp)->closed) + return; + o_stream_close(channel->mstream->parent); + } +} + +static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream) +{ + struct multiplex_ochannel **channelp; + /* can't do anything until they are all closed */ + array_foreach_modifiable(&mstream->channels, channelp) + if (*channelp != NULL) + return; + o_stream_flush(mstream->parent); + o_stream_unref(&mstream->parent); + array_free(&mstream->channels); + buffer_free(&mstream->wbuf); + i_free(mstream); +} + +static void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream) +{ + struct multiplex_ochannel **channelp; + struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream; + o_stream_multiplex_ochannel_close(stream, TRUE); + if (channel->buf != NULL) + buffer_free(&channel->buf); + /* delete the channel */ + array_foreach_modifiable(&channel->mstream->channels, channelp) { + if (*channelp != NULL && (*channelp)->cid == channel->cid) { + *channelp = NULL; + break; + } + } + o_stream_multiplex_try_destroy(channel->mstream); +} + +static struct ostream * +o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid) +{ + struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1); + channel->cid = cid; + channel->buf = buffer_create_dynamic(default_pool, 256); + channel->mstream = mstream; + channel->ostream.sendv = o_stream_multiplex_ochannel_sendv; + channel->ostream.iostream.close = o_stream_multiplex_ochannel_close; + channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy; + if (cid == 0) + channel->ostream.fd = o_stream_get_fd(mstream->parent); + else + channel->ostream.fd = -1; + array_append(&channel->mstream->channels, &channel, 1); + + return o_stream_create(&channel->ostream, NULL, mstream->bufsize); +} + +struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid) +{ + struct multiplex_ochannel *chan = + (struct multiplex_ochannel *)stream->real_stream; + i_assert(get_channel(chan->mstream, cid) == NULL); + + return o_stream_add_channel_real(chan->mstream, cid); +} + +struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize) +{ + struct multiplex_ostream *mstream; + + mstream = i_new(struct multiplex_ostream, 1); + mstream->parent = parent; + mstream->bufsize = bufsize; + mstream->wbuf = buffer_create_dynamic(default_pool, 256); + i_array_init(&mstream->channels, 8); + o_stream_ref(parent); + + return o_stream_add_channel_real(mstream, 0); +} + +uint8_t o_stream_multiplex_get_channel_id(struct ostream *stream) +{ + struct multiplex_ochannel *channel = + (struct multiplex_ochannel *)stream->real_stream; + return channel->cid; +} diff -r 7b8e95de2bff -r 7db517071db5 src/lib/ostream-multiplex.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/ostream-multiplex.h Tue Aug 22 10:14:22 2017 +0300 @@ -0,0 +1,8 @@ +#ifndef OSTREAM_MULTIPLEX +#define OSTREAM_MULTIPLEX 1 + +struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize); +struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid); +uint8_t o_stream_multiplex_get_channel_id(struct ostream *stream); + +#endif diff -r 7b8e95de2bff -r 7db517071db5 src/lib/test-istream-multiplex.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/test-istream-multiplex.c Tue Aug 22 10:14:22 2017 +0300 @@ -0,0 +1,370 @@ +/* Copyright (c) 2016-2017 Dovecot authors, see the included COPYING file */ + +#include "test-lib.h" +#include "ioloop.h" +#include "str.h" +#include "crc32.h" +#include "randgen.h" +#include "istream-private.h" +#include "istream-multiplex.h" +#include "ostream.h" +#include + +static void test_istream_multiplex_simple(void) +{ + test_begin("istream multiplex (simple)"); + + static const char data[] = "\x00\x00\x00\x00\x06Hello\x00" + "\x01\x00\x00\x00\x03Wor" + "\x00\x00\x00\x00\x00" + "\x01\x00\x00\x00\x03ld\x00"; + static const size_t data_len = sizeof(data)-1; + struct istream *input = test_istream_create_data(data, data_len); + size_t siz; + + struct istream *chan0 = i_stream_create_multiplex(input, (size_t)-1); + struct istream *chan1 = i_stream_multiplex_add_channel(chan0, 1); + + /* nothing to read until the first byte */ + for (size_t i = 0; i <= 1+4; i++) { + test_istream_set_size(input, i); + test_assert(i_stream_read(chan0) == 0); + test_assert(i_stream_read(chan1) == 0); + } + + /* partial read of the first packet */ + size_t input_max = 1+4+3; + test_istream_set_size(input, input_max); + test_assert(i_stream_read(chan0) == 3); + test_assert(memcmp(i_stream_get_data(chan0, &siz), "Hel", 3) == 0 && + siz == 3); + test_assert(i_stream_read(chan1) == 0); + + /* read the rest of the first packet and the second packet. + read chan1 before chan0 to see that it works. */ + input_max += 3 + 1+4+3; + test_istream_set_size(input, input_max); + test_assert(i_stream_read(chan1) == 3); + test_assert(i_stream_read(chan0) == 3); + test_assert(memcmp(i_stream_get_data(chan0, &siz), "Hello\0", 6) == 0 && + siz == 6); + test_assert(memcmp(i_stream_get_data(chan1, &siz), "Wor", 3) == 0 && + siz == 3); + + /* 0-sized packet is ignored */ + input_max += 1+4; + test_istream_set_size(input, input_max); + test_assert(i_stream_read(chan0) == 0); + test_assert(i_stream_read(chan1) == 0); + + /* read the final packet */ + input_max += 1+4+3; + i_assert(input_max == data_len); + test_istream_set_size(input, input_max); + test_assert(i_stream_read(chan0) == 0); + test_assert(i_stream_read(chan1) == 3); + + /* we should have the final data in all channels now */ + test_assert(memcmp(i_stream_get_data(chan0, &siz), "Hello\0", 6) == 0 && + siz == 6); + test_assert(memcmp(i_stream_get_data(chan1, &siz), "World\0", 6) == 0 && + siz == 6); + + /* all channels should return EOF */ + test_assert(i_stream_read(chan0) == -1 && chan0->stream_errno == 0); + i_stream_unref(&chan0); + + test_assert(i_stream_read(chan1) == -1 && chan1->stream_errno == 0); + i_stream_unref(&chan1); + + i_stream_unref(&input); + + test_end(); +} + +static void test_istream_multiplex_maxbuf(void) +{ + test_begin("istream multiplex (maxbuf)"); + + static const char data[] = "\x00\x00\x00\x00\x06Hello\x00" + "\x01\x00\x00\x00\x06World\x00"; + static const size_t data_len = sizeof(data)-1; + struct istream *input = test_istream_create_data(data, data_len); + size_t siz; + + struct istream *chan0 = i_stream_create_multiplex(input, 5); + struct istream *chan1 = i_stream_multiplex_add_channel(chan0, 1); + + /* we get data for channel 0 and congest */ + test_assert(i_stream_read(chan1) == 0); + /* we read data for channel 0 */ + test_assert(i_stream_read(chan0) == 5); + /* and now it's congested */ + test_assert(i_stream_read(chan0) == -2); + test_assert(memcmp(i_stream_get_data(chan0, &siz), "Hello", 5) == 0 && + siz == 5); + /* consume data */ + i_stream_skip(chan0, 5); + /* we read data for channel 1 */ + test_assert(i_stream_read(chan1) == 5); + test_assert(memcmp(i_stream_get_data(chan1, &siz), "World", 5) == 0 && + siz == 5); + /* consume data */ + i_stream_skip(chan1, 5); + /* read last byte */ + test_assert(i_stream_read(chan0) == 1); + /* now we get byte for channel 1 */ + test_assert(i_stream_read(chan0) == 0); + /* now we read byte for channel 1 */ + test_assert(i_stream_read(chan1) == 1); + /* and everything should return EOF now */ + test_assert(i_stream_read(chan1) == -1); + test_assert(i_stream_read(chan0) == -1); + + i_stream_unref(&chan0); + i_stream_unref(&chan1); + + i_stream_unref(&input); + + test_end(); +} + +static void test_istream_multiplex_random(void) +{ + const unsigned int max_channel = 6; + const unsigned int packets_count = 30; + + test_begin("istream multiplex (random)"); + + unsigned int i; + uoff_t bytes_written = 0, bytes_read = 0; + buffer_t *buf = buffer_create_dynamic(default_pool, 10240); + uint32_t input_crc[max_channel]; + uint32_t output_crc[max_channel]; + memset(input_crc, 0, sizeof(input_crc)); + memset(output_crc, 0, sizeof(output_crc)); + + for (i = 0; i < packets_count; i++) { + unsigned int len = 1 + rand() % 1024; + unsigned char packet_data[len]; + uint32_t len_be = cpu32_to_be(len); + unsigned int channel = rand() % max_channel; + + random_fill(packet_data, len); + input_crc[channel] = + crc32_data_more(input_crc[channel], packet_data, len); + + buffer_append_c(buf, channel); + buffer_append(buf, &len_be, sizeof(len_be)); + buffer_append(buf, packet_data, len); + bytes_written += len; + } + + struct istream *input = test_istream_create_data(buf->data, buf->used); + struct istream *chan[max_channel]; + chan[0] = i_stream_create_multiplex(input, 1024/4); + for (i = 1; i < max_channel; i++) + chan[i] = i_stream_multiplex_add_channel(chan[0], i); + + test_istream_set_size(input, 0); + + /* read from each stream, 1 byte at a time */ + size_t input_size = 0; + int max_ret = -3; + unsigned int read_max_channel = max_channel/2; + bool something_read = FALSE; + for (i = 0;;) { + ssize_t ret = i_stream_read(chan[i]); + if (max_ret < ret) + max_ret = ret; + if (ret > 0) { + size_t size; + const unsigned char *data = + i_stream_get_data(chan[i], &size); + + output_crc[i] = crc32_data_more(output_crc[i], data, size); + bytes_read += size; + + test_assert((size_t)ret == size); + i_stream_skip(chan[i], size); + something_read = TRUE; + } + if (++i < read_max_channel) + ; + else if (max_ret == 0 && !something_read && + read_max_channel < max_channel) { + read_max_channel++; + } else { + if (max_ret <= -1) { + test_assert(max_ret == -1); + break; + } + if (max_ret == 0) + test_istream_set_size(input, ++input_size); + i = 0; + max_ret = -3; + something_read = FALSE; + read_max_channel = max_channel/2; + } + } + test_assert(bytes_read == bytes_written); + for (i = 0; i < max_channel; i++) { + test_assert_idx(input_crc[i] == output_crc[i], i); + test_assert_idx(i_stream_read(chan[i]) == -1 && + chan[i]->stream_errno == 0, i); + i_stream_unref(&chan[i]); + } + i_stream_unref(&input); + buffer_free(&buf); + test_end(); +} + +static unsigned int channel_counter[2] = {0, 0}; + +static const char *msgs[] = { + "", + "a", + "bb", + "ccc", + "dddd", + "eeeee", + "ffffff" +}; + +static void test_istream_multiplex_stream_read(struct istream *channel) +{ + uint8_t cid = i_stream_multiplex_get_channel_id(channel); + const char *line; + size_t siz; + + if (i_stream_read(channel) < 0) + return; + + while((line = i_stream_next_line(channel)) != NULL) { + siz = strlen(line); + test_assert_idx(siz > 0 && siz < N_ELEMENTS(msgs), + channel_counter[cid]); + if (siz > 0 && siz < N_ELEMENTS(msgs)) { + test_assert_idx(strcmp(line, msgs[siz]) == 0, + channel_counter[cid]); + } + channel_counter[cid]++; + } + + if (channel_counter[0] > 100 && channel_counter[1] > 100) + io_loop_stop(current_ioloop); +} + +static void test_send_msg(struct ostream *os, uint8_t cid, const char *msg) +{ + uint32_t len = cpu32_to_be(strlen(msg) + 1); + const struct const_iovec iov[] = { + { &cid, sizeof(cid) }, + { &len, sizeof(len) }, + { msg, strlen(msg) }, + { "\n", 1 } /* newline added for i_stream_next_line */ + }; + o_stream_sendv(os, iov, N_ELEMENTS(iov)); +} + +static void test_istream_multiplex_stream_write(struct ostream *channel) +{ + size_t rounds = rand() % 10; + for(size_t i = 0; i < rounds; i++) { + uint8_t cid = rand() % 2; + test_send_msg(channel, cid, msgs[1 + rand() % (N_ELEMENTS(msgs) - 1)]); + } +} + +static void test_istream_multiplex_stream(void) +{ + test_begin("istream multiplex (stream)"); + struct ioloop *ioloop = io_loop_create(); + io_loop_set_current(ioloop); + + int fds[2]; + test_assert(pipe(fds) == 0); + struct ostream *os = o_stream_create_fd(fds[1], (size_t)-1, FALSE); + struct istream *is = i_stream_create_fd(fds[0], 10 + rand() % 10, FALSE); + + struct istream *chan0 = i_stream_create_multiplex(is, (size_t)-1); + struct istream *chan1 = i_stream_multiplex_add_channel(chan0, 1); + + struct io *io0 = + io_add_istream(chan0, test_istream_multiplex_stream_read, chan0); + struct io *io1 = + io_add_istream(chan1, test_istream_multiplex_stream_read, chan1); + struct io *io2 = + io_add(fds[1], IO_WRITE, test_istream_multiplex_stream_write, os); + + io_loop_run(current_ioloop); + + io_remove(&io0); + io_remove(&io1); + io_remove(&io2); + + i_stream_unref(&chan1); + i_stream_unref(&chan0); + i_stream_unref(&is); + + o_stream_unref(&os); + + io_loop_destroy(&ioloop); + + i_close_fd(&fds[0]); + i_close_fd(&fds[1]); + + test_end(); +} + +static void test_istream_multiplex_close_channel(void) +{ + test_begin("istream multiplex (close channel)"); + static const char *data = "\x00\x00\x00\x00\x06Hello\x00" + "\x01\x00\x00\x00\x06World\x00"; + static const size_t data_len = 22; + struct istream *input = test_istream_create_data(data, data_len); + size_t siz; + + struct istream *chan0 = i_stream_create_multiplex(input, (size_t)-1); + struct istream *chan1 = i_stream_multiplex_add_channel(chan0, 1); + + i_stream_unref(&chan1); + + test_assert(i_stream_read(chan0) == 6); + + test_assert(memcmp(i_stream_get_data(chan0, &siz), "Hello\0", 6) == 0 && + siz == 6); + + i_stream_unref(&chan0); + i_stream_unref(&input); + + input = test_istream_create_data(data, data_len); + chan0 = i_stream_create_multiplex(input, (size_t)-1); + chan1 = i_stream_multiplex_add_channel(chan0, 1); + + /* this is needed to populate chan1 data */ + (void)i_stream_read(chan0); + i_stream_unref(&chan0); + + test_assert(i_stream_read(chan1) == 6); + + test_assert(memcmp(i_stream_get_data(chan1, &siz), "World\0", 6) == 0 && + siz == 6); + + i_stream_unref(&chan1); + i_stream_unref(&input); + + test_end(); +} + +void test_istream_multiplex(void) +{ + random_init(); + test_istream_multiplex_simple(); + test_istream_multiplex_maxbuf(); + test_istream_multiplex_random(); + test_istream_multiplex_stream(); + test_istream_multiplex_close_channel(); + random_deinit(); +} diff -r 7b8e95de2bff -r 7db517071db5 src/lib/test-lib.c --- a/src/lib/test-lib.c Wed Oct 04 15:41:03 2017 +0300 +++ b/src/lib/test-lib.c Tue Aug 22 10:14:22 2017 +0300 @@ -33,6 +33,7 @@ test_istream_concat, test_istream_crlf, test_istream_failure_at, + test_istream_multiplex, test_istream_seekable, test_istream_tee, test_istream_unix, @@ -50,6 +51,8 @@ test_ostream_escaped, test_ostream_failure_at, test_ostream_file, + test_ostream_multiplex, + test_multiplex, test_primes, test_printf_format_fix, test_priorityq, diff -r 7b8e95de2bff -r 7db517071db5 src/lib/test-lib.h --- a/src/lib/test-lib.h Wed Oct 04 15:41:03 2017 +0300 +++ b/src/lib/test-lib.h Tue Aug 22 10:14:22 2017 +0300 @@ -34,6 +34,7 @@ void test_istream_concat(void); void test_istream_crlf(void); void test_istream_failure_at(void); +void test_istream_multiplex(void); void test_istream_seekable(void); void test_istream_tee(void); void test_istream_unix(void); @@ -47,6 +48,7 @@ enum fatal_test_state fatal_mempool(unsigned int); void test_mempool_alloconly(void); enum fatal_test_state fatal_mempool_alloconly(unsigned int); +void test_multiplex(void); void test_pkcs5_pbkdf2(void); void test_net(void); void test_numpack(void); @@ -54,6 +56,7 @@ void test_ostream_escaped(void); void test_ostream_failure_at(void); void test_ostream_file(void); +void test_ostream_multiplex(void); void test_primes(void); void test_printf_format_fix(void); enum fatal_test_state fatal_printf_format_fix(unsigned int); diff -r 7b8e95de2bff -r 7db517071db5 src/lib/test-multiplex.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/test-multiplex.c Tue Aug 22 10:14:22 2017 +0300 @@ -0,0 +1,160 @@ +/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */ + +#include "test-lib.h" +#include "ioloop.h" +#include "str.h" +#include "istream.h" +#include "istream-multiplex.h" +#include "ostream.h" +#include "ostream-multiplex.h" +#include "ostream.h" +#include "randgen.h" + +#include + +struct test_channel { + int fds[2]; + unsigned int cid; + + struct istream *in; + struct ostream *out; + struct io *io; + + struct istream *in_alt; + struct ostream *out_alt; + struct io *io_alt; + + buffer_t *received; + buffer_t *received_alt; + + unsigned int counter; +}; + +static struct test_channel test_channel[2]; + +static void test_multiplex_channel_write(struct test_channel *channel) +{ + unsigned char buf[128]; + size_t len = rand() % sizeof(buf); + random_fill(buf, len); + o_stream_send(channel->out, buf, len); + o_stream_send(channel->out_alt, buf, len); +} + +static void test_multiplex_stream_write(struct ostream *channel ATTR_UNUSED) +{ + if (test_channel[0].received->used > 1000 && + test_channel[1].received->used > 1000) + io_loop_stop(current_ioloop); + else + test_multiplex_channel_write(&test_channel[rand() % 2]); +} + +static void test_istream_multiplex_stream_read(struct test_channel *channel) +{ + const unsigned char *data = NULL; + size_t siz = 0; + + if (i_stream_read(channel->in) > 0) { + data = i_stream_get_data(channel->in, &siz); + buffer_append(channel->received, data, siz); + i_stream_skip(channel->in, siz); + } +} + +static void test_istream_read_alt(struct test_channel *channel) +{ + const unsigned char *data = NULL; + size_t siz = 0; + + if (i_stream_read(channel->in_alt) > 0) { + data = i_stream_get_data(channel->in_alt, &siz); + buffer_append(channel->received_alt, data, siz); + i_stream_skip(channel->in_alt, siz); + } +} + +static void setup_channel(struct test_channel *channel, + struct istream *is, struct ostream *os) +{ + /* setup first channel */ + channel->in = is; + channel->out = os; + channel->io = io_add_istream(is, test_istream_multiplex_stream_read, + channel); + test_assert(pipe(channel->fds) == 0); + channel->in_alt = i_stream_create_fd(channel->fds[0], (size_t)-1, FALSE); + channel->out_alt = o_stream_create_fd(channel->fds[1], IO_BLOCK_SIZE, FALSE); + channel->io_alt = io_add_istream(channel->in_alt, test_istream_read_alt, + channel); + channel->received = buffer_create_dynamic(default_pool, 32768); + channel->received_alt = buffer_create_dynamic(default_pool, 32768); +} + +static void teardown_channel(struct test_channel *channel) +{ + test_assert(memcmp(channel->received->data, + channel->received_alt->data, + channel->received->used) == 0); + test_assert(channel->received->used == channel->received_alt->used); + + buffer_free(&channel->received); + buffer_free(&channel->received_alt); + + io_remove(&channel->io); + io_remove(&channel->io_alt); + i_stream_unref(&channel->in); + o_stream_unref(&channel->out); + i_stream_unref(&channel->in_alt); + o_stream_unref(&channel->out_alt); + i_close_fd(&channel->fds[0]); + i_close_fd(&channel->fds[1]); +} + +static void test_multiplex_stream(void) { + test_begin("test multiplex (stream)"); + + struct ioloop *ioloop = io_loop_create(); + io_loop_set_current(ioloop); + + int fds[2]; + test_assert(pipe(fds) == 0); + struct ostream *os = o_stream_create_fd(fds[1], (size_t)-1, FALSE); + struct istream *is = i_stream_create_fd(fds[0], (size_t)-1, FALSE); + + struct istream *ichan0 = i_stream_create_multiplex(is, (size_t)-1); + struct istream *ichan1 = i_stream_multiplex_add_channel(ichan0, 1); + i_stream_unref(&is); + + struct ostream *ochan0 = o_stream_create_multiplex(os, 1024); + struct ostream *ochan1 = o_stream_multiplex_add_channel(ochan0, 1); + o_stream_unref(&os); + + struct io *io = io_add(fds[1], IO_WRITE, test_multiplex_stream_write, os); + + setup_channel(&test_channel[0], ichan0, ochan0); + setup_channel(&test_channel[1], ichan1, ochan1); + + test_channel[0].cid = 0; + test_channel[1].cid = 1; + + io_loop_run(current_ioloop); + + io_remove(&io); + + teardown_channel(&test_channel[0]); + teardown_channel(&test_channel[1]); + + io_loop_destroy(&ioloop); + + i_close_fd(&fds[0]); + i_close_fd(&fds[1]); + + test_end(); +} + +void test_multiplex(void) { + random_init(); + test_multiplex_stream(); + random_deinit(); +} diff -r 7b8e95de2bff -r 7db517071db5 src/lib/test-ostream-multiplex.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib/test-ostream-multiplex.c Tue Aug 22 10:14:22 2017 +0300 @@ -0,0 +1,147 @@ +/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */ + +#include "test-lib.h" +#include "randgen.h" +#include "ioloop.h" +#include "str.h" +#include "istream.h" +#include "ostream-private.h" +#include "ostream-multiplex.h" +#include "ostream.h" +#include + +#include "hex-binary.h" + +static void test_ostream_multiplex_simple(void) +{ + test_begin("ostream multiplex (simple)"); + + const unsigned char expected[] = { + '\x00','\x00','\x00','\x00','\x05','\x68','\x65', + '\x6c','\x6c','\x6f','\x01','\x00','\x00','\x00', + '\x05','\x77','\x6f','\x72','\x6c','\x64' + }; + + buffer_t *result = t_str_new(64); + struct ostream *os = test_ostream_create(result); + struct ostream *os2 = o_stream_create_multiplex(os, (size_t)-1); + struct ostream *os3 = o_stream_multiplex_add_channel(os2, 1); + + test_assert(o_stream_send_str(os2, "hello") == 5); + test_assert(o_stream_send_str(os3, "world") == 5); + + o_stream_unref(&os3); + o_stream_unref(&os2); + o_stream_unref(&os); + + test_assert(sizeof(expected) == result->used); + test_assert(memcmp(result->data, expected, I_MIN(sizeof(expected), + result->used)) == 0); + + test_end(); +} + +static unsigned int channel_counter[2] = {0, 0}; +static struct ostream *chan0, *chan1; + +static const char *msgs[] = { + "", + "a", + "bb", + "ccc", + "dddd", + "eeeee", + "ffffff" +}; + +static void test_ostream_multiplex_stream_read(struct istream *is) +{ + uint8_t cid; + const unsigned char *data; + size_t siz,dlen=0,pos=0; + + if (i_stream_read_more(is, &data, &siz)>0) { + /* parse stream */ + for(;pos 0) { + if (dlen < N_ELEMENTS(msgs)) { + test_assert_idx(memcmp(&data[pos], + msgs[dlen], dlen)==0, + channel_counter[data[0] % 2]); + } + channel_counter[data[0] % 2]++; + pos += dlen; + dlen = 0; + } else if (dlen == 0) { + cid = data[pos] % 2; + test_assert_idx(data[pos] < 2, channel_counter[cid]); + pos++; + dlen = be32_to_cpu_unaligned(&data[pos]); + pos += 4; + test_assert(dlen > 0 && dlen < N_ELEMENTS(msgs)); + } + } + i_stream_skip(is, siz); + } + + if (channel_counter[0] > 100 && channel_counter[1] > 100) + io_loop_stop(current_ioloop); +} + +static void test_ostream_multiplex_stream_write(struct ostream *channel ATTR_UNUSED) +{ + size_t rounds = 1 + rand() % 10; + for(size_t i = 0; i < rounds; i++) { + if ((rand() % 2) != 0) + o_stream_send_str(chan1, msgs[rand() % N_ELEMENTS(msgs)]); + else + o_stream_send_str(chan0, msgs[rand() % N_ELEMENTS(msgs)]); + } +} + +static void test_ostream_multiplex_stream(void) +{ + test_begin("ostream multiplex (stream)"); + + struct ioloop *ioloop = io_loop_create(); + io_loop_set_current(ioloop); + + int fds[2]; + test_assert(pipe(fds) == 0); + struct ostream *os = o_stream_create_fd(fds[1], (size_t)-1, FALSE); + struct istream *is = i_stream_create_fd(fds[0], (size_t)-1, FALSE); + + chan0 = o_stream_create_multiplex(os, (size_t)-1); + chan1 = o_stream_multiplex_add_channel(chan0, 1); + + struct io *io0 = + io_add_istream(is, test_ostream_multiplex_stream_read, is); + struct io *io1 = + io_add(fds[1], IO_WRITE, test_ostream_multiplex_stream_write, os); + + io_loop_run(current_ioloop); + + io_remove(&io0); + io_remove(&io1); + + o_stream_unref(&chan1); + o_stream_unref(&chan0); + + i_stream_unref(&is); + o_stream_unref(&os); + + io_loop_destroy(&ioloop); + + i_close_fd(&fds[0]); + i_close_fd(&fds[1]); + + test_end(); +} + +void test_ostream_multiplex(void) +{ + random_init(); + test_ostream_multiplex_simple(); + test_ostream_multiplex_stream(); + random_deinit(); +}