view src/lib/ostream-multiplex.c @ 23007:36e01285b5b8

lib: buffer - Improve header comment for buffer_insert() and buffer_delete().
author Stephan Bosch <stephan.bosch@dovecot.fi>
date Mon, 18 Mar 2019 00:52:37 +0100
parents cb108f786fb4
children
line wrap: on
line source

/* Copyright (c) 2017-2018 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "ioloop.h"
#include "array.h"
#include "ostream-private.h"
#include "ostream-multiplex.h"

/* all multiplex packets are [1 byte cid][4 byte length][data] */

struct multiplex_ostream;

struct multiplex_ochannel {
	struct ostream_private ostream;
	struct multiplex_ostream *mstream;
	uint8_t cid;
	buffer_t *buf;
	time_t last_sent;
	bool closed:1;
};

struct multiplex_ostream {
	struct ostream *parent;

	/* channel 0 is main channel */
	uint8_t cur_channel;
	unsigned int remain;
	buffer_t *wbuf;
	size_t bufsize;
	ARRAY(struct multiplex_ochannel *) channels;

	bool destroyed:1;
};

static struct multiplex_ochannel *
get_channel(struct multiplex_ostream *mstream, uint8_t cid)
{
	struct multiplex_ochannel **channelp;
	i_assert(mstream != NULL);
	array_foreach_modifiable(&mstream->channels, channelp) {
		if (*channelp != NULL && (*channelp)->cid == cid)
			return *channelp;
	}
	return NULL;
}

static void propagate_error(struct multiplex_ostream *mstream, int stream_errno)
{
	struct multiplex_ochannel **channelp;
	array_foreach_modifiable(&mstream->channels, channelp)
		if (*channelp != NULL)
			(*channelp)->ostream.ostream.stream_errno = stream_errno;
}

static struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream)
{
	time_t oldest = ioloop_time;
	struct multiplex_ochannel *channel = NULL;
	struct multiplex_ochannel **channelp;
	array_foreach_modifiable(&mstream->channels, channelp)
		if (*channelp != NULL && (*channelp)->last_sent <= oldest &&
		    (*channelp)->buf->used > 0)
			channel = *channelp;
	return channel;
}

static ssize_t
o_stream_multiplex_sendv(struct multiplex_ostream *mstream)
{
	struct multiplex_ochannel *channel;
	ssize_t ret = 0;
	if (mstream->bufsize <= mstream->wbuf->used + 5)
		return -2;

	while((channel = get_next_channel(mstream)) != NULL) {
		size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
		/* ensure it fits into 32 bit int */
		size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
		if (tmp == 0)
			break;
		uint32_t len = cpu32_to_be(amt);
		buffer_append(mstream->wbuf, &channel->cid, 1);
		buffer_append(mstream->wbuf, &len, 4);
		buffer_append(mstream->wbuf, channel->buf->data, amt);
		buffer_delete(channel->buf, 0, amt);
		channel->last_sent = ioloop_time;
	}

	if (mstream->wbuf->used > 0) {
		ret = o_stream_send(mstream->parent, mstream->wbuf->data,
				    mstream->wbuf->used);
		if (ret < 0) {
			propagate_error(mstream, mstream->parent->stream_errno);
			return ret;
		}
		buffer_delete(mstream->wbuf, 0, ret);
	}
	return ret;
}

static ssize_t
o_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
				 const struct const_iovec *iov, unsigned int iov_count)
{
	struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
	ssize_t ret;
	size_t total = 0;
	if (channel->mstream->bufsize <= channel->buf->used)
		return -2;

	for(unsigned int i=0; i < iov_count; i++) {
		/* copy data to buffer */
		size_t tmp = channel->mstream->bufsize - channel->buf->used;
		if (tmp == 0)
			break;
		buffer_append(channel->buf, iov[i].iov_base,
			      I_MIN(tmp, iov[i].iov_len));
		total += I_MIN(tmp, iov[i].iov_len);
	}

	stream->ostream.offset += total;

	if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
		return ret;

	return total;
}

static void
o_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent)
{
	struct multiplex_ochannel *const *channelp;
	struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
	(void)o_stream_flush(&channel->ostream.ostream);

	channel->closed = TRUE;
	if (close_parent) {
		array_foreach(&channel->mstream->channels, channelp)
			if (*channelp !=NULL && !(*channelp)->closed)
				return;
		o_stream_close(channel->mstream->parent);
	}
}

static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream)
{
	struct multiplex_ochannel **channelp;
	/* can't do anything until they are all closed */
	array_foreach_modifiable(&mstream->channels, channelp)
		if (*channelp != NULL)
			return;
	o_stream_unref(&mstream->parent);
	array_free(&mstream->channels);
	buffer_free(&mstream->wbuf);
	i_free(mstream);
}

static void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream)
{
	struct multiplex_ochannel **channelp;
	struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
	o_stream_multiplex_ochannel_close(stream, TRUE);
	o_stream_unref(&channel->ostream.parent);
	if (channel->buf != NULL)
		buffer_free(&channel->buf);
	/* delete the channel */
	array_foreach_modifiable(&channel->mstream->channels, channelp) {
		if (*channelp != NULL && (*channelp)->cid == channel->cid) {
			*channelp = NULL;
			break;
		}
	}
	o_stream_multiplex_try_destroy(channel->mstream);
}

static struct ostream *
o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
{
	struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1);
	channel->cid = cid;
	channel->buf = buffer_create_dynamic(default_pool, 256);
	channel->mstream = mstream;
	channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
	channel->ostream.iostream.close = o_stream_multiplex_ochannel_close;
	channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy;
	channel->ostream.fd = o_stream_get_fd(mstream->parent);
	array_append(&channel->mstream->channels, &channel, 1);

	return o_stream_create(&channel->ostream, mstream->parent,
			       mstream->bufsize);
}

struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
{
	struct multiplex_ochannel *chan =
		(struct multiplex_ochannel *)stream->real_stream;
	i_assert(get_channel(chan->mstream, cid) == NULL);

	return o_stream_add_channel_real(chan->mstream, cid);
}

struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize)
{
	struct multiplex_ostream *mstream;

	mstream = i_new(struct multiplex_ostream, 1);
	mstream->parent = parent;
	mstream->bufsize = bufsize;
	mstream->wbuf = buffer_create_dynamic(default_pool, 256);
	i_array_init(&mstream->channels, 8);
	o_stream_ref(parent);

	return o_stream_add_channel_real(mstream, 0);
}

uint8_t o_stream_multiplex_get_channel_id(struct ostream *stream)
{
	struct multiplex_ochannel *channel =
		(struct multiplex_ochannel *)stream->real_stream;
	return channel->cid;
}