changeset 22553:7db517071db5

lib: Add multiplex stream support This allows having multiple channels of data in single stream.
author Aki Tuomi <aki.tuomi@dovecot.fi>
date Tue, 22 Aug 2017 10:14:22 +0300
parents 7b8e95de2bff
children 8c27d8d766bd
files src/lib/Makefile.am src/lib/istream-multiplex.c src/lib/istream-multiplex.h src/lib/ostream-multiplex.c src/lib/ostream-multiplex.h src/lib/test-istream-multiplex.c src/lib/test-lib.c src/lib/test-lib.h src/lib/test-multiplex.c src/lib/test-ostream-multiplex.c
diffstat 10 files changed, 1200 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib/Makefile.am	Wed Oct 04 15:41:03 2017 +0300
+++ b/src/lib/Makefile.am	Tue Aug 22 10:14:22 2017 +0300
@@ -72,6 +72,7 @@
 	istream-jsonstr.c \
 	istream-limit.c \
 	istream-mmap.c \
+	istream-multiplex.c \
 	istream-rawlog.c \
 	istream-seekable.c \
 	istream-sized.c \
@@ -114,6 +115,7 @@
 	ostream-failure-at.c \
 	ostream-file.c \
 	ostream-hash.c \
+	ostream-multiplex.c \
 	ostream-null.c \
 	ostream-rawlog.c \
 	ostream-unix.c \
@@ -220,6 +222,7 @@
 	istream-file-private.h \
 	istream-hash.h \
 	istream-jsonstr.h \
+	istream-multiplex.h \
 	istream-private.h \
 	istream-rawlog.h \
 	istream-seekable.h \
@@ -255,6 +258,7 @@
 	ostream-failure-at.h \
 	ostream-file-private.h \
 	ostream-hash.h \
+	ostream-multiplex.h \
 	ostream-private.h \
 	ostream-null.h \
 	ostream-rawlog.h \
@@ -340,6 +344,7 @@
 	test-istream-concat.c \
 	test-istream-crlf.c \
 	test-istream-failure-at.c \
+	test-istream-multiplex.c \
 	test-istream-seekable.c \
 	test-istream-tee.c \
 	test-istream-unix.c \
@@ -357,6 +362,8 @@
 	test-ostream-escaped.c \
 	test-ostream-failure-at.c \
 	test-ostream-file.c \
+	test-ostream-multiplex.c \
+	test-multiplex.c \
 	test-primes.c \
 	test-printf-format-fix.c \
 	test-priorityq.c \
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/istream-multiplex.c	Tue Aug 22 10:14:22 2017 +0300
@@ -0,0 +1,270 @@
+/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "array.h"
+#include "istream-private.h"
+#include "istream-multiplex.h"
+
+/* all multiplex packets are [1 byte cid][4 byte length][data] */
+
+struct multiplex_istream;
+
+struct multiplex_ichannel {
+	struct istream_private istream;
+	struct multiplex_istream *mstream;
+	uint8_t cid;
+	size_t pending_pos;
+	bool closed:1;
+};
+
+struct multiplex_istream {
+	struct istream *parent;
+
+	/* channel 0 is main channel */
+	uint8_t cur_channel;
+	unsigned int remain;
+	size_t bufsize;
+	ARRAY(struct multiplex_ichannel *) channels;
+
+	bool blocking:1;
+};
+
+static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream);
+
+static struct multiplex_ichannel *
+get_channel(struct multiplex_istream *mstream, uint8_t cid)
+{
+	struct multiplex_ichannel **channelp;
+	i_assert(mstream != NULL);
+	array_foreach_modifiable(&mstream->channels, channelp) {
+		if (*channelp != NULL && (*channelp)->cid == cid)
+			return *channelp;
+	}
+	return NULL;
+}
+
+static void propagate_error(struct multiplex_istream *mstream, int stream_errno)
+{
+	struct multiplex_ichannel **channelp;
+	array_foreach_modifiable(&mstream->channels, channelp)
+		if (*channelp != NULL)
+			(*channelp)->istream.istream.stream_errno = stream_errno;
+}
+
+static void propagate_eof(struct multiplex_istream *mstream)
+{
+	struct multiplex_ichannel **channelp;
+	array_foreach_modifiable(&mstream->channels, channelp) {
+		if (*channelp != NULL) {
+			(*channelp)->istream.istream.eof = TRUE;
+		}
+	}
+}
+
+static ssize_t
+i_stream_multiplex_read(struct multiplex_istream *mstream, uint8_t cid)
+{
+	struct multiplex_ichannel *req_channel = get_channel(mstream, cid);
+	const unsigned char *data;
+	size_t len = 0, used, wanted, avail;
+	ssize_t ret, got = 0;
+
+	if (mstream->parent == NULL) {
+		req_channel->istream.istream.eof = TRUE;
+		return -1;
+	}
+
+	data = i_stream_get_data(mstream->parent, &len);
+
+	if (len == 0 && mstream->parent->closed) {
+		req_channel->istream.istream.eof = TRUE;
+		return -1;
+	}
+
+	if (((mstream->remain > 0 && len == 0) ||
+	     (mstream->remain == 0 && len < 5)) &&
+	    (ret = i_stream_read(mstream->parent)) <= 0) {
+		propagate_error(mstream, mstream->parent->stream_errno);
+		if (mstream->parent->eof)
+			propagate_eof(mstream);
+		return ret;
+	}
+
+	for(;;) {
+		data = i_stream_get_data(mstream->parent, &len);
+		if (len == 0) {
+			if (got == 0 && mstream->blocking)
+				got += i_stream_multiplex_read(mstream, cid);
+			break;
+		}
+
+		if (mstream->remain > 0) {
+			struct multiplex_ichannel *channel =
+				get_channel(mstream, mstream->cur_channel);
+			wanted = I_MIN(len, mstream->remain);
+			/* is it open? */
+			if (channel != NULL && !channel->closed) {
+				struct istream_private *stream = &channel->istream;
+				stream->pos += channel->pending_pos;
+				bool alloc_ret = i_stream_try_alloc(stream, wanted, &avail);
+				stream->pos -= channel->pending_pos;
+				if (!alloc_ret) {
+					i_stream_set_input_pending(&stream->istream, TRUE);
+					if (channel->cid != cid)
+						return 0;
+					if (got > 0)
+						break;
+					return -2;
+				}
+
+				used = I_MIN(wanted, avail);
+
+				/* dump into buffer */
+				if (channel->cid != cid) {
+					i_assert(stream->pos + channel->pending_pos + used <= stream->buffer_size);
+					memcpy(stream->w_buffer + stream->pos + channel->pending_pos,
+					       data, used);
+					channel->pending_pos += used;
+					i_stream_set_input_pending(&stream->istream, TRUE);
+				} else {
+					i_assert(stream->pos + used <= stream->buffer_size);
+					memcpy(stream->w_buffer + stream->pos, data, used);
+					stream->pos += used;
+					got += used;
+				}
+			} else {
+				used = wanted;
+			}
+			mstream->remain -= used;
+			i_stream_skip(mstream->parent, used);
+			/* see if there is more to read */
+			continue;
+		}
+		if (mstream->remain == 0) {
+			/* need more data */
+			if (len < 5) {
+				ret = i_stream_multiplex_ichannel_read(&req_channel->istream);
+				if (ret > 0)
+					got += ret;
+				break;
+			}
+			/* channel ID */
+			mstream->cur_channel = data[0];
+			/* data length */
+			mstream->remain = be32_to_cpu_unaligned(data+1);
+			i_stream_skip(mstream->parent, 5);
+		}
+	}
+
+	propagate_error(mstream, mstream->parent->stream_errno);
+	if (mstream->parent->eof)
+		propagate_eof(mstream);
+
+	return got;
+}
+
+static ssize_t i_stream_multiplex_ichannel_read(struct istream_private *stream)
+{
+	struct multiplex_ichannel *channel = (struct multiplex_ichannel*)stream;
+	/* if previous multiplex read dumped data for us
+	   actually serve it here. */
+	if (channel->pending_pos > 0) {
+		ssize_t ret = channel->pending_pos;
+		stream->pos += channel->pending_pos;
+		channel->pending_pos = 0;
+		return ret;
+	}
+	return i_stream_multiplex_read(channel->mstream, channel->cid);
+}
+
+static void
+i_stream_multiplex_ichannel_close(struct iostream_private *stream, bool close_parent)
+{
+	struct multiplex_ichannel *const *channelp;
+	struct multiplex_ichannel *channel = (struct multiplex_ichannel*)stream;
+	channel->closed = TRUE;
+	if (close_parent) {
+		array_foreach(&channel->mstream->channels, channelp)
+			if (*channelp != NULL && !(*channelp)->closed)
+				return;
+		i_stream_close(channel->mstream->parent);
+	}
+}
+
+static void i_stream_multiplex_try_destroy(struct multiplex_istream *mstream)
+{
+	struct multiplex_ichannel **channelp;
+	/* can't do anything until they are all closed */
+	array_foreach_modifiable(&mstream->channels, channelp)
+		if (*channelp != NULL)
+			return;
+	i_stream_unref(&mstream->parent);
+	array_free(&mstream->channels);
+	i_free(mstream);
+}
+
+static void i_stream_multiplex_ichannel_destroy(struct iostream_private *stream)
+{
+	struct multiplex_ichannel **channelp;
+	struct multiplex_ichannel *channel = (struct multiplex_ichannel*)stream;
+	i_stream_multiplex_ichannel_close(stream, TRUE);
+	i_free(channel->istream.w_buffer);
+	array_foreach_modifiable(&channel->mstream->channels, channelp) {
+		if (*channelp == channel) {
+			*channelp = NULL;
+			break;
+		}
+	}
+	i_stream_multiplex_try_destroy(channel->mstream);
+}
+
+static struct istream *
+i_stream_add_channel_real(struct multiplex_istream *mstream, uint8_t cid)
+{
+	struct multiplex_ichannel *channel = i_new(struct multiplex_ichannel, 1);
+	channel->cid = cid;
+	channel->mstream = mstream;
+	channel->istream.read = i_stream_multiplex_ichannel_read;
+	channel->istream.iostream.close = i_stream_multiplex_ichannel_close;
+	channel->istream.iostream.destroy = i_stream_multiplex_ichannel_destroy;
+	channel->istream.max_buffer_size = mstream->bufsize;
+	channel->istream.istream.blocking = mstream->blocking;
+	if (cid == 0)
+		channel->istream.fd = i_stream_get_fd(mstream->parent);
+	else
+		channel->istream.fd = -1;
+	array_append(&channel->mstream->channels, &channel, 1);
+
+	return i_stream_create(&channel->istream, NULL, channel->istream.fd);
+}
+
+struct istream *i_stream_multiplex_add_channel(struct istream *stream, uint8_t cid)
+{
+	struct multiplex_ichannel *chan =
+		(struct multiplex_ichannel *)stream->real_stream;
+	i_assert(get_channel(chan->mstream, cid) == NULL);
+
+	return i_stream_add_channel_real(chan->mstream, cid);
+}
+
+struct istream *i_stream_create_multiplex(struct istream *parent, size_t bufsize)
+{
+	struct multiplex_istream *mstream;
+
+	mstream = i_new(struct multiplex_istream, 1);
+	mstream->parent = parent;
+	mstream->bufsize = bufsize;
+	mstream->blocking = parent->blocking;
+	i_array_init(&mstream->channels, 8);
+	i_stream_ref(parent);
+
+	return i_stream_add_channel_real(mstream, 0);
+}
+
+uint8_t i_stream_multiplex_get_channel_id(struct istream *stream)
+{
+	struct multiplex_ichannel *channel =
+		(struct multiplex_ichannel *)stream->real_stream;
+	return channel->cid;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/istream-multiplex.h	Tue Aug 22 10:14:22 2017 +0300
@@ -0,0 +1,8 @@
+#ifndef ISTREAM_MULTIPLEX
+#define ISTREAM_MULTIPLEX 1
+
+struct istream *i_stream_create_multiplex(struct istream *parent, size_t bufsize);
+struct istream *i_stream_multiplex_add_channel(struct istream *stream, uint8_t cid);
+uint8_t i_stream_multiplex_get_channel_id(struct istream *stream);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/ostream-multiplex.c	Tue Aug 22 10:14:22 2017 +0300
@@ -0,0 +1,224 @@
+/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "array.h"
+#include "ostream-private.h"
+#include "ostream-multiplex.h"
+
+/* all multiplex packets are [1 byte cid][4 byte length][data] */
+
+struct multiplex_ostream;
+
+struct multiplex_ochannel {
+	struct ostream_private ostream;
+	struct multiplex_ostream *mstream;
+	uint8_t cid;
+	buffer_t *buf;
+	time_t last_sent;
+	bool closed:1;
+};
+
+struct multiplex_ostream {
+	struct ostream *parent;
+
+	/* channel 0 is main channel */
+	uint8_t cur_channel;
+	unsigned int remain;
+	buffer_t *wbuf;
+	size_t bufsize;
+	ARRAY(struct multiplex_ochannel *) channels;
+
+	bool destroyed:1;
+};
+
+static struct multiplex_ochannel *
+get_channel(struct multiplex_ostream *mstream, uint8_t cid)
+{
+	struct multiplex_ochannel **channelp;
+	i_assert(mstream != NULL);
+	array_foreach_modifiable(&mstream->channels, channelp) {
+		if (*channelp != NULL && (*channelp)->cid == cid)
+			return *channelp;
+	}
+	return NULL;
+}
+
+static void propagate_error(struct multiplex_ostream *mstream, int stream_errno)
+{
+	struct multiplex_ochannel **channelp;
+	array_foreach_modifiable(&mstream->channels, channelp)
+		if (*channelp != NULL)
+			(*channelp)->ostream.ostream.stream_errno = stream_errno;
+}
+
+static struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream)
+{
+	time_t oldest = ioloop_time;
+	struct multiplex_ochannel *channel = NULL;
+	struct multiplex_ochannel **channelp;
+	array_foreach_modifiable(&mstream->channels, channelp)
+		if (*channelp != NULL && (*channelp)->last_sent <= oldest &&
+		    (*channelp)->buf->used > 0)
+			channel = *channelp;
+	return channel;
+}
+
+static ssize_t
+o_stream_multiplex_sendv(struct multiplex_ostream *mstream)
+{
+	struct multiplex_ochannel *channel;
+	ssize_t ret = 0;
+	if (mstream->bufsize <= mstream->wbuf->used + 5)
+		return -2;
+
+	while((channel = get_next_channel(mstream)) != NULL) {
+		size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
+		/* ensure it fits into 32 bit int */
+		size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
+		if (tmp == 0)
+			break;
+		uint32_t len = cpu32_to_be(amt);
+		buffer_append(mstream->wbuf, &channel->cid, 1);
+		buffer_append(mstream->wbuf, &len, 4);
+		buffer_append(mstream->wbuf, channel->buf->data, amt);
+		buffer_delete(channel->buf, 0, amt);
+		channel->last_sent = ioloop_time;
+	}
+
+	if (mstream->wbuf->used > 0) {
+		ret = o_stream_send(mstream->parent, mstream->wbuf->data,
+				    mstream->wbuf->used);
+		if (ret < 0) {
+			propagate_error(mstream, mstream->parent->stream_errno);
+			return ret;
+		}
+		o_stream_flush(mstream->parent);
+		buffer_delete(mstream->wbuf, 0, ret);
+	}
+	return ret;
+}
+
+static ssize_t
+o_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
+				 const struct const_iovec *iov, unsigned int iov_count)
+{
+	struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
+	ssize_t ret;
+	size_t total = 0;
+	if (channel->mstream->bufsize <= channel->buf->used)
+		return -2;
+
+	for(unsigned int i=0; i < iov_count; i++) {
+		/* copy data to buffer */
+		size_t tmp = channel->mstream->bufsize - channel->buf->used;
+		if (tmp == 0)
+			break;
+		buffer_append(channel->buf, iov[i].iov_base,
+			      I_MIN(tmp, iov[i].iov_len));
+		total += I_MIN(tmp, iov[i].iov_len);
+	}
+
+	stream->ostream.offset += total;
+
+	if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
+		return ret;
+
+	return total;
+}
+
+static void
+o_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent)
+{
+	struct multiplex_ochannel *const *channelp;
+	struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
+	o_stream_flush(&channel->ostream.ostream);
+
+	channel->closed = TRUE;
+	if (close_parent) {
+		array_foreach(&channel->mstream->channels, channelp)
+			if (*channelp !=NULL && !(*channelp)->closed)
+				return;
+		o_stream_close(channel->mstream->parent);
+	}
+}
+
+static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream)
+{
+	struct multiplex_ochannel **channelp;
+	/* can't do anything until they are all closed */
+	array_foreach_modifiable(&mstream->channels, channelp)
+		if (*channelp != NULL)
+			return;
+	o_stream_flush(mstream->parent);
+	o_stream_unref(&mstream->parent);
+	array_free(&mstream->channels);
+	buffer_free(&mstream->wbuf);
+	i_free(mstream);
+}
+
+static void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream)
+{
+	struct multiplex_ochannel **channelp;
+	struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
+	o_stream_multiplex_ochannel_close(stream, TRUE);
+	if (channel->buf != NULL)
+		buffer_free(&channel->buf);
+	/* delete the channel */
+	array_foreach_modifiable(&channel->mstream->channels, channelp) {
+		if (*channelp != NULL && (*channelp)->cid == channel->cid) {
+			*channelp = NULL;
+			break;
+		}
+	}
+	o_stream_multiplex_try_destroy(channel->mstream);
+}
+
+static struct ostream *
+o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
+{
+	struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1);
+	channel->cid = cid;
+	channel->buf = buffer_create_dynamic(default_pool, 256);
+	channel->mstream = mstream;
+	channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
+	channel->ostream.iostream.close = o_stream_multiplex_ochannel_close;
+	channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy;
+	if (cid == 0)
+		channel->ostream.fd = o_stream_get_fd(mstream->parent);
+	else
+		channel->ostream.fd = -1;
+	array_append(&channel->mstream->channels, &channel, 1);
+
+	return o_stream_create(&channel->ostream, NULL, mstream->bufsize);
+}
+
+struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
+{
+	struct multiplex_ochannel *chan =
+		(struct multiplex_ochannel *)stream->real_stream;
+	i_assert(get_channel(chan->mstream, cid) == NULL);
+
+	return o_stream_add_channel_real(chan->mstream, cid);
+}
+
+struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize)
+{
+	struct multiplex_ostream *mstream;
+
+	mstream = i_new(struct multiplex_ostream, 1);
+	mstream->parent = parent;
+	mstream->bufsize = bufsize;
+	mstream->wbuf = buffer_create_dynamic(default_pool, 256);
+	i_array_init(&mstream->channels, 8);
+	o_stream_ref(parent);
+
+	return o_stream_add_channel_real(mstream, 0);
+}
+
+uint8_t o_stream_multiplex_get_channel_id(struct ostream *stream)
+{
+	struct multiplex_ochannel *channel =
+		(struct multiplex_ochannel *)stream->real_stream;
+	return channel->cid;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/ostream-multiplex.h	Tue Aug 22 10:14:22 2017 +0300
@@ -0,0 +1,8 @@
+#ifndef OSTREAM_MULTIPLEX
+#define OSTREAM_MULTIPLEX 1
+
+struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize);
+struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid);
+uint8_t o_stream_multiplex_get_channel_id(struct ostream *stream);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/test-istream-multiplex.c	Tue Aug 22 10:14:22 2017 +0300
@@ -0,0 +1,370 @@
+/* Copyright (c) 2016-2017 Dovecot authors, see the included COPYING file */
+
+#include "test-lib.h"
+#include "ioloop.h"
+#include "str.h"
+#include "crc32.h"
+#include "randgen.h"
+#include "istream-private.h"
+#include "istream-multiplex.h"
+#include "ostream.h"
+#include <unistd.h>
+
+static void test_istream_multiplex_simple(void)
+{
+	test_begin("istream multiplex (simple)");
+
+	static const char data[] = "\x00\x00\x00\x00\x06Hello\x00"
+				   "\x01\x00\x00\x00\x03Wor"
+				   "\x00\x00\x00\x00\x00"
+				   "\x01\x00\x00\x00\x03ld\x00";
+	static const size_t data_len = sizeof(data)-1;
+	struct istream *input = test_istream_create_data(data, data_len);
+	size_t siz;
+
+	struct istream *chan0 = i_stream_create_multiplex(input, (size_t)-1);
+	struct istream *chan1 = i_stream_multiplex_add_channel(chan0, 1);
+
+	/* nothing to read until the first byte */
+	for (size_t i = 0; i <= 1+4; i++) {
+		test_istream_set_size(input, i);
+		test_assert(i_stream_read(chan0) == 0);
+		test_assert(i_stream_read(chan1) == 0);
+	}
+
+	/* partial read of the first packet */
+	size_t input_max = 1+4+3;
+	test_istream_set_size(input, input_max);
+	test_assert(i_stream_read(chan0) == 3);
+	test_assert(memcmp(i_stream_get_data(chan0, &siz), "Hel", 3) == 0 &&
+		    siz == 3);
+	test_assert(i_stream_read(chan1) == 0);
+
+	/* read the rest of the first packet and the second packet.
+	   read chan1 before chan0 to see that it works. */
+	input_max += 3 + 1+4+3;
+	test_istream_set_size(input, input_max);
+	test_assert(i_stream_read(chan1) == 3);
+	test_assert(i_stream_read(chan0) == 3);
+	test_assert(memcmp(i_stream_get_data(chan0, &siz), "Hello\0", 6) == 0 &&
+		    siz == 6);
+	test_assert(memcmp(i_stream_get_data(chan1, &siz), "Wor", 3) == 0 &&
+		    siz == 3);
+
+	/* 0-sized packet is ignored */
+	input_max += 1+4;
+	test_istream_set_size(input, input_max);
+	test_assert(i_stream_read(chan0) == 0);
+	test_assert(i_stream_read(chan1) == 0);
+
+	/* read the final packet */
+	input_max += 1+4+3;
+	i_assert(input_max == data_len);
+        test_istream_set_size(input, input_max);
+	test_assert(i_stream_read(chan0) == 0);
+	test_assert(i_stream_read(chan1) == 3);
+
+	/* we should have the final data in all channels now */
+	test_assert(memcmp(i_stream_get_data(chan0, &siz), "Hello\0", 6) == 0 &&
+		    siz == 6);
+	test_assert(memcmp(i_stream_get_data(chan1, &siz), "World\0", 6) == 0 &&
+		    siz == 6);
+
+	/* all channels should return EOF */
+	test_assert(i_stream_read(chan0) == -1 && chan0->stream_errno == 0);
+	i_stream_unref(&chan0);
+
+	test_assert(i_stream_read(chan1) == -1 && chan1->stream_errno == 0);
+	i_stream_unref(&chan1);
+
+	i_stream_unref(&input);
+
+	test_end();
+}
+
+static void test_istream_multiplex_maxbuf(void)
+{
+	test_begin("istream multiplex (maxbuf)");
+
+	static const char data[] = "\x00\x00\x00\x00\x06Hello\x00"
+				   "\x01\x00\x00\x00\x06World\x00";
+	static const size_t data_len = sizeof(data)-1;
+	struct istream *input = test_istream_create_data(data, data_len);
+	size_t siz;
+
+	struct istream *chan0 = i_stream_create_multiplex(input, 5);
+	struct istream *chan1 = i_stream_multiplex_add_channel(chan0, 1);
+
+	/* we get data for channel 0 and congest */
+	test_assert(i_stream_read(chan1) == 0);
+	/* we read data for channel 0 */
+	test_assert(i_stream_read(chan0) == 5);
+	/* and now it's congested */
+	test_assert(i_stream_read(chan0) == -2);
+	test_assert(memcmp(i_stream_get_data(chan0, &siz), "Hello", 5) == 0 &&
+		    siz == 5);
+	/* consume data */
+	i_stream_skip(chan0, 5);
+	/* we read data for channel 1 */
+	test_assert(i_stream_read(chan1) == 5);
+	test_assert(memcmp(i_stream_get_data(chan1, &siz), "World", 5) == 0 &&
+		    siz == 5);
+	/* consume data */
+	i_stream_skip(chan1, 5);
+	/* read last byte */
+	test_assert(i_stream_read(chan0) == 1);
+	/* now we get byte for channel 1 */
+	test_assert(i_stream_read(chan0) == 0);
+	/* now we read byte for channel 1 */
+	test_assert(i_stream_read(chan1) == 1);
+	/* and everything should return EOF now */
+	test_assert(i_stream_read(chan1) == -1);
+	test_assert(i_stream_read(chan0) == -1);
+
+	i_stream_unref(&chan0);
+	i_stream_unref(&chan1);
+
+	i_stream_unref(&input);
+
+	test_end();
+}
+
+static void test_istream_multiplex_random(void)
+{
+	const unsigned int max_channel = 6;
+	const unsigned int packets_count = 30;
+
+	test_begin("istream multiplex (random)");
+
+	unsigned int i;
+	uoff_t bytes_written = 0, bytes_read = 0;
+	buffer_t *buf = buffer_create_dynamic(default_pool, 10240);
+	uint32_t input_crc[max_channel];
+	uint32_t output_crc[max_channel];
+	memset(input_crc, 0, sizeof(input_crc));
+	memset(output_crc, 0, sizeof(output_crc));
+
+	for (i = 0; i < packets_count; i++) {
+		unsigned int len = 1 + rand() % 1024;
+		unsigned char packet_data[len];
+		uint32_t len_be = cpu32_to_be(len);
+		unsigned int channel = rand() % max_channel;
+
+		random_fill(packet_data, len);
+		input_crc[channel] =
+			crc32_data_more(input_crc[channel], packet_data, len);
+
+		buffer_append_c(buf, channel);
+		buffer_append(buf, &len_be, sizeof(len_be));
+		buffer_append(buf, packet_data, len);
+		bytes_written += len;
+	}
+
+	struct istream *input = test_istream_create_data(buf->data, buf->used);
+	struct istream *chan[max_channel];
+	chan[0] = i_stream_create_multiplex(input, 1024/4);
+	for (i = 1; i < max_channel; i++)
+		chan[i] = i_stream_multiplex_add_channel(chan[0], i);
+
+	test_istream_set_size(input, 0);
+
+	/* read from each stream, 1 byte at a time */
+	size_t input_size = 0;
+	int max_ret = -3;
+	unsigned int read_max_channel = max_channel/2;
+	bool something_read = FALSE;
+	for (i = 0;;) {
+		ssize_t ret = i_stream_read(chan[i]);
+		if (max_ret < ret)
+			max_ret = ret;
+		if (ret > 0) {
+			size_t size;
+			const unsigned char *data =
+				i_stream_get_data(chan[i], &size);
+
+			output_crc[i] = crc32_data_more(output_crc[i], data, size);
+			bytes_read += size;
+
+			test_assert((size_t)ret == size);
+			i_stream_skip(chan[i], size);
+			something_read = TRUE;
+		}
+		if (++i < read_max_channel)
+			;
+		else if (max_ret == 0 && !something_read &&
+			 read_max_channel < max_channel) {
+			read_max_channel++;
+		} else {
+			if (max_ret <= -1) {
+				test_assert(max_ret == -1);
+				break;
+			}
+			if (max_ret == 0)
+				test_istream_set_size(input, ++input_size);
+			i = 0;
+			max_ret = -3;
+			something_read = FALSE;
+			read_max_channel = max_channel/2;
+		}
+	}
+	test_assert(bytes_read == bytes_written);
+	for (i = 0; i < max_channel; i++) {
+		test_assert_idx(input_crc[i] == output_crc[i], i);
+		test_assert_idx(i_stream_read(chan[i]) == -1 &&
+				chan[i]->stream_errno == 0, i);
+		i_stream_unref(&chan[i]);
+	}
+	i_stream_unref(&input);
+	buffer_free(&buf);
+	test_end();
+}
+
+static unsigned int channel_counter[2] = {0, 0};
+
+static const char *msgs[] = {
+	"",
+	"a",
+	"bb",
+	"ccc",
+	"dddd",
+	"eeeee",
+	"ffffff"
+};
+
+static void test_istream_multiplex_stream_read(struct istream *channel)
+{
+	uint8_t cid = i_stream_multiplex_get_channel_id(channel);
+	const char *line;
+	size_t siz;
+
+	if (i_stream_read(channel) < 0)
+		return;
+
+	while((line = i_stream_next_line(channel)) != NULL) {
+		siz = strlen(line);
+		test_assert_idx(siz > 0 && siz < N_ELEMENTS(msgs),
+				channel_counter[cid]);
+		if (siz > 0 && siz < N_ELEMENTS(msgs)) {
+			test_assert_idx(strcmp(line, msgs[siz]) == 0,
+					channel_counter[cid]);
+		}
+		channel_counter[cid]++;
+	}
+
+	if (channel_counter[0] > 100 && channel_counter[1] > 100)
+		io_loop_stop(current_ioloop);
+}
+
+static void test_send_msg(struct ostream *os, uint8_t cid, const char *msg)
+{
+	uint32_t len = cpu32_to_be(strlen(msg) + 1);
+	const struct const_iovec iov[] = {
+		{ &cid, sizeof(cid) },
+		{ &len, sizeof(len) },
+		{ msg, strlen(msg) },
+		{ "\n", 1 } /* newline added for i_stream_next_line */
+	};
+	o_stream_sendv(os, iov, N_ELEMENTS(iov));
+}
+
+static void test_istream_multiplex_stream_write(struct ostream *channel)
+{
+	size_t rounds = rand() % 10;
+	for(size_t i = 0; i < rounds; i++) {
+		uint8_t cid = rand() % 2;
+		test_send_msg(channel, cid, msgs[1 + rand() % (N_ELEMENTS(msgs) - 1)]);
+	}
+}
+
+static void test_istream_multiplex_stream(void)
+{
+	test_begin("istream multiplex (stream)");
+	struct ioloop *ioloop = io_loop_create();
+	io_loop_set_current(ioloop);
+
+	int fds[2];
+	test_assert(pipe(fds) == 0);
+	struct ostream *os = o_stream_create_fd(fds[1], (size_t)-1, FALSE);
+	struct istream *is = i_stream_create_fd(fds[0], 10 + rand() % 10, FALSE);
+
+	struct istream *chan0 = i_stream_create_multiplex(is, (size_t)-1);
+	struct istream *chan1 = i_stream_multiplex_add_channel(chan0, 1);
+
+	struct io *io0 =
+		io_add_istream(chan0, test_istream_multiplex_stream_read, chan0);
+	struct io *io1 =
+		io_add_istream(chan1, test_istream_multiplex_stream_read, chan1);
+	struct io *io2 =
+		io_add(fds[1], IO_WRITE, test_istream_multiplex_stream_write, os);
+
+	io_loop_run(current_ioloop);
+
+	io_remove(&io0);
+	io_remove(&io1);
+	io_remove(&io2);
+
+	i_stream_unref(&chan1);
+	i_stream_unref(&chan0);
+	i_stream_unref(&is);
+
+	o_stream_unref(&os);
+
+	io_loop_destroy(&ioloop);
+
+	i_close_fd(&fds[0]);
+	i_close_fd(&fds[1]);
+
+	test_end();
+}
+
+static void test_istream_multiplex_close_channel(void)
+{
+	test_begin("istream multiplex (close channel)");
+	static const char *data = "\x00\x00\x00\x00\x06Hello\x00"
+				  "\x01\x00\x00\x00\x06World\x00";
+	static const size_t data_len = 22;
+	struct istream *input = test_istream_create_data(data, data_len);
+	size_t siz;
+
+	struct istream *chan0 = i_stream_create_multiplex(input, (size_t)-1);
+	struct istream *chan1 = i_stream_multiplex_add_channel(chan0, 1);
+
+	i_stream_unref(&chan1);
+
+	test_assert(i_stream_read(chan0) == 6);
+
+	test_assert(memcmp(i_stream_get_data(chan0, &siz), "Hello\0", 6) == 0 &&
+		    siz == 6);
+
+	i_stream_unref(&chan0);
+	i_stream_unref(&input);
+
+	input = test_istream_create_data(data, data_len);
+	chan0 = i_stream_create_multiplex(input, (size_t)-1);
+	chan1 = i_stream_multiplex_add_channel(chan0, 1);
+
+	/* this is needed to populate chan1 data */
+	(void)i_stream_read(chan0);
+	i_stream_unref(&chan0);
+
+	test_assert(i_stream_read(chan1) == 6);
+
+	test_assert(memcmp(i_stream_get_data(chan1, &siz), "World\0", 6) == 0 &&
+		    siz == 6);
+
+	i_stream_unref(&chan1);
+	i_stream_unref(&input);
+
+	test_end();
+}
+
+void test_istream_multiplex(void)
+{
+	random_init();
+	test_istream_multiplex_simple();
+	test_istream_multiplex_maxbuf();
+	test_istream_multiplex_random();
+	test_istream_multiplex_stream();
+	test_istream_multiplex_close_channel();
+	random_deinit();
+}
--- a/src/lib/test-lib.c	Wed Oct 04 15:41:03 2017 +0300
+++ b/src/lib/test-lib.c	Tue Aug 22 10:14:22 2017 +0300
@@ -33,6 +33,7 @@
 		test_istream_concat,
 		test_istream_crlf,
 		test_istream_failure_at,
+		test_istream_multiplex,
 		test_istream_seekable,
 		test_istream_tee,
 		test_istream_unix,
@@ -50,6 +51,8 @@
 		test_ostream_escaped,
 		test_ostream_failure_at,
 		test_ostream_file,
+		test_ostream_multiplex,
+		test_multiplex,
 		test_primes,
 		test_printf_format_fix,
 		test_priorityq,
--- a/src/lib/test-lib.h	Wed Oct 04 15:41:03 2017 +0300
+++ b/src/lib/test-lib.h	Tue Aug 22 10:14:22 2017 +0300
@@ -34,6 +34,7 @@
 void test_istream_concat(void);
 void test_istream_crlf(void);
 void test_istream_failure_at(void);
+void test_istream_multiplex(void);
 void test_istream_seekable(void);
 void test_istream_tee(void);
 void test_istream_unix(void);
@@ -47,6 +48,7 @@
 enum fatal_test_state fatal_mempool(unsigned int);
 void test_mempool_alloconly(void);
 enum fatal_test_state fatal_mempool_alloconly(unsigned int);
+void test_multiplex(void);
 void test_pkcs5_pbkdf2(void);
 void test_net(void);
 void test_numpack(void);
@@ -54,6 +56,7 @@
 void test_ostream_escaped(void);
 void test_ostream_failure_at(void);
 void test_ostream_file(void);
+void test_ostream_multiplex(void);
 void test_primes(void);
 void test_printf_format_fix(void);
 enum fatal_test_state fatal_printf_format_fix(unsigned int);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/test-multiplex.c	Tue Aug 22 10:14:22 2017 +0300
@@ -0,0 +1,160 @@
+/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */
+
+#include "test-lib.h"
+#include "ioloop.h"
+#include "str.h"
+#include "istream.h"
+#include "istream-multiplex.h"
+#include "ostream.h"
+#include "ostream-multiplex.h"
+#include "ostream.h"
+#include "randgen.h"
+
+#include <unistd.h>
+
+struct test_channel {
+	int fds[2];
+	unsigned int cid;
+
+	struct istream *in;
+	struct ostream *out;
+	struct io *io;
+
+	struct istream *in_alt;
+	struct ostream *out_alt;
+	struct io *io_alt;
+
+	buffer_t *received;
+	buffer_t *received_alt;
+
+	unsigned int counter;
+};
+
+static struct test_channel test_channel[2];
+
+static void test_multiplex_channel_write(struct test_channel *channel)
+{
+	unsigned char buf[128];
+	size_t len = rand() % sizeof(buf);
+	random_fill(buf, len);
+	o_stream_send(channel->out, buf, len);
+	o_stream_send(channel->out_alt, buf, len);
+}
+
+static void test_multiplex_stream_write(struct ostream *channel ATTR_UNUSED)
+{
+	if (test_channel[0].received->used > 1000 &&
+	    test_channel[1].received->used > 1000)
+		io_loop_stop(current_ioloop);
+	else
+		test_multiplex_channel_write(&test_channel[rand() % 2]);
+}
+
+static void test_istream_multiplex_stream_read(struct test_channel *channel)
+{
+	const unsigned char *data = NULL;
+	size_t siz = 0;
+
+	if (i_stream_read(channel->in) > 0) {
+		data = i_stream_get_data(channel->in, &siz);
+		buffer_append(channel->received, data, siz);
+		i_stream_skip(channel->in, siz);
+	}
+}
+
+static void test_istream_read_alt(struct test_channel *channel)
+{
+	const unsigned char *data = NULL;
+	size_t siz = 0;
+
+	if (i_stream_read(channel->in_alt) > 0) {
+		data = i_stream_get_data(channel->in_alt, &siz);
+		buffer_append(channel->received_alt, data, siz);
+		i_stream_skip(channel->in_alt, siz);
+	}
+}
+
+static void setup_channel(struct test_channel *channel,
+			  struct istream *is, struct ostream *os)
+{
+	/* setup first channel */
+	channel->in = is;
+	channel->out = os;
+	channel->io = io_add_istream(is, test_istream_multiplex_stream_read,
+				     channel);
+	test_assert(pipe(channel->fds) == 0);
+	channel->in_alt = i_stream_create_fd(channel->fds[0], (size_t)-1, FALSE);
+	channel->out_alt = o_stream_create_fd(channel->fds[1], IO_BLOCK_SIZE, FALSE);
+	channel->io_alt = io_add_istream(channel->in_alt, test_istream_read_alt,
+					 channel);
+	channel->received = buffer_create_dynamic(default_pool, 32768);
+	channel->received_alt = buffer_create_dynamic(default_pool, 32768);
+}
+
+static void teardown_channel(struct test_channel *channel)
+{
+	test_assert(memcmp(channel->received->data,
+			   channel->received_alt->data,
+			   channel->received->used) == 0);
+	test_assert(channel->received->used == channel->received_alt->used);
+
+	buffer_free(&channel->received);
+	buffer_free(&channel->received_alt);
+
+	io_remove(&channel->io);
+	io_remove(&channel->io_alt);
+	i_stream_unref(&channel->in);
+	o_stream_unref(&channel->out);
+	i_stream_unref(&channel->in_alt);
+	o_stream_unref(&channel->out_alt);
+	i_close_fd(&channel->fds[0]);
+	i_close_fd(&channel->fds[1]);
+}
+
+static void test_multiplex_stream(void) {
+	test_begin("test multiplex (stream)");
+
+	struct ioloop *ioloop = io_loop_create();
+	io_loop_set_current(ioloop);
+
+	int fds[2];
+	test_assert(pipe(fds) == 0);
+	struct ostream *os = o_stream_create_fd(fds[1], (size_t)-1, FALSE);
+	struct istream *is = i_stream_create_fd(fds[0], (size_t)-1, FALSE);
+
+	struct istream *ichan0 = i_stream_create_multiplex(is, (size_t)-1);
+	struct istream *ichan1 = i_stream_multiplex_add_channel(ichan0, 1);
+	i_stream_unref(&is);
+
+	struct ostream *ochan0 = o_stream_create_multiplex(os, 1024);
+	struct ostream *ochan1 = o_stream_multiplex_add_channel(ochan0, 1);
+	o_stream_unref(&os);
+
+	struct io *io = io_add(fds[1], IO_WRITE, test_multiplex_stream_write, os);
+
+	setup_channel(&test_channel[0], ichan0, ochan0);
+	setup_channel(&test_channel[1], ichan1, ochan1);
+
+	test_channel[0].cid = 0;
+	test_channel[1].cid = 1;
+
+	io_loop_run(current_ioloop);
+
+	io_remove(&io);
+
+	teardown_channel(&test_channel[0]);
+	teardown_channel(&test_channel[1]);
+
+	io_loop_destroy(&ioloop);
+
+	i_close_fd(&fds[0]);
+	i_close_fd(&fds[1]);
+
+	test_end();
+}
+
+void test_multiplex(void) {
+	random_init();
+	test_multiplex_stream();
+	random_deinit();
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/test-ostream-multiplex.c	Tue Aug 22 10:14:22 2017 +0300
@@ -0,0 +1,147 @@
+/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */
+
+#include "test-lib.h"
+#include "randgen.h"
+#include "ioloop.h"
+#include "str.h"
+#include "istream.h"
+#include "ostream-private.h"
+#include "ostream-multiplex.h"
+#include "ostream.h"
+#include <unistd.h>
+
+#include "hex-binary.h"
+
+static void test_ostream_multiplex_simple(void)
+{
+	test_begin("ostream multiplex (simple)");
+
+	const unsigned char expected[] = {
+		'\x00','\x00','\x00','\x00','\x05','\x68','\x65',
+		'\x6c','\x6c','\x6f','\x01','\x00','\x00','\x00',
+		'\x05','\x77','\x6f','\x72','\x6c','\x64'
+	};
+
+	buffer_t *result = t_str_new(64);
+	struct ostream *os = test_ostream_create(result);
+	struct ostream *os2 = o_stream_create_multiplex(os, (size_t)-1);
+	struct ostream *os3 = o_stream_multiplex_add_channel(os2, 1);
+
+	test_assert(o_stream_send_str(os2, "hello") == 5);
+	test_assert(o_stream_send_str(os3, "world") == 5);
+
+	o_stream_unref(&os3);
+	o_stream_unref(&os2);
+	o_stream_unref(&os);
+
+	test_assert(sizeof(expected) == result->used);
+	test_assert(memcmp(result->data, expected, I_MIN(sizeof(expected),
+		    result->used)) == 0);
+
+	test_end();
+}
+
+static unsigned int channel_counter[2] = {0, 0};
+static struct ostream *chan0, *chan1;
+
+static const char *msgs[] = {
+	"",
+	"a",
+	"bb",
+	"ccc",
+	"dddd",
+	"eeeee",
+	"ffffff"
+};
+
+static void test_ostream_multiplex_stream_read(struct istream *is)
+{
+	uint8_t cid;
+	const unsigned char *data;
+	size_t siz,dlen=0,pos=0;
+
+	if (i_stream_read_more(is, &data, &siz)>0) {
+		/* parse stream */
+		for(;pos<siz;) {
+			if (dlen > 0) {
+				if (dlen < N_ELEMENTS(msgs)) {
+					test_assert_idx(memcmp(&data[pos],
+							       msgs[dlen], dlen)==0,
+							channel_counter[data[0] % 2]);
+				}
+				channel_counter[data[0] % 2]++;
+				pos += dlen;
+				dlen = 0;
+			} else if (dlen == 0) {
+				cid = data[pos] % 2;
+				test_assert_idx(data[pos] < 2, channel_counter[cid]);
+				pos++;
+				dlen = be32_to_cpu_unaligned(&data[pos]);
+				pos += 4;
+				test_assert(dlen > 0 && dlen < N_ELEMENTS(msgs));
+			}
+		}
+		i_stream_skip(is, siz);
+	}
+
+	if (channel_counter[0] > 100 && channel_counter[1] > 100)
+		io_loop_stop(current_ioloop);
+}
+
+static void test_ostream_multiplex_stream_write(struct ostream *channel ATTR_UNUSED)
+{
+	size_t rounds = 1 + rand() % 10;
+	for(size_t i = 0; i < rounds; i++) {
+		if ((rand() % 2) != 0)
+			o_stream_send_str(chan1, msgs[rand() % N_ELEMENTS(msgs)]);
+		else
+			o_stream_send_str(chan0, msgs[rand() % N_ELEMENTS(msgs)]);
+	}
+}
+
+static void test_ostream_multiplex_stream(void)
+{
+	test_begin("ostream multiplex (stream)");
+
+	struct ioloop *ioloop = io_loop_create();
+	io_loop_set_current(ioloop);
+
+	int fds[2];
+	test_assert(pipe(fds) == 0);
+	struct ostream *os = o_stream_create_fd(fds[1], (size_t)-1, FALSE);
+	struct istream *is = i_stream_create_fd(fds[0], (size_t)-1, FALSE);
+
+	chan0 = o_stream_create_multiplex(os, (size_t)-1);
+	chan1 = o_stream_multiplex_add_channel(chan0, 1);
+
+	struct io *io0 =
+		io_add_istream(is, test_ostream_multiplex_stream_read, is);
+	struct io *io1 =
+		io_add(fds[1], IO_WRITE, test_ostream_multiplex_stream_write, os);
+
+	io_loop_run(current_ioloop);
+
+	io_remove(&io0);
+	io_remove(&io1);
+
+	o_stream_unref(&chan1);
+	o_stream_unref(&chan0);
+
+	i_stream_unref(&is);
+	o_stream_unref(&os);
+
+	io_loop_destroy(&ioloop);
+
+	i_close_fd(&fds[0]);
+	i_close_fd(&fds[1]);
+
+	test_end();
+}
+
+void test_ostream_multiplex(void)
+{
+	random_init();
+	test_ostream_multiplex_simple();
+	test_ostream_multiplex_stream();
+	random_deinit();
+}