Mercurial > dovecot > core-2.2
view src/lib/ostream-multiplex.c @ 22585:e1ad624fc2ad
lib: ostream-multiplex - set ostream_private.parent
Unlike with istream-multiplex, there are no issues with I/Os. Only the
parent stream will have the I/O. Using the default parent adds the
missing methods that otherwise would have needed to be implemented:
- cork
- flush_pending
- switch_ioloop
author | Timo Sirainen <timo.sirainen@dovecot.fi> |
---|---|
date | Mon, 09 Oct 2017 13:19:32 +0300 |
parents | 7db517071db5 |
children | 041460202062 |
line wrap: on
line source
/* Copyright (c) 2017 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "ioloop.h" #include "array.h" #include "ostream-private.h" #include "ostream-multiplex.h" /* all multiplex packets are [1 byte cid][4 byte length][data] */ struct multiplex_ostream; struct multiplex_ochannel { struct ostream_private ostream; struct multiplex_ostream *mstream; uint8_t cid; buffer_t *buf; time_t last_sent; bool closed:1; }; struct multiplex_ostream { struct ostream *parent; /* channel 0 is main channel */ uint8_t cur_channel; unsigned int remain; buffer_t *wbuf; size_t bufsize; ARRAY(struct multiplex_ochannel *) channels; bool destroyed:1; }; static struct multiplex_ochannel * get_channel(struct multiplex_ostream *mstream, uint8_t cid) { struct multiplex_ochannel **channelp; i_assert(mstream != NULL); array_foreach_modifiable(&mstream->channels, channelp) { if (*channelp != NULL && (*channelp)->cid == cid) return *channelp; } return NULL; } static void propagate_error(struct multiplex_ostream *mstream, int stream_errno) { struct multiplex_ochannel **channelp; array_foreach_modifiable(&mstream->channels, channelp) if (*channelp != NULL) (*channelp)->ostream.ostream.stream_errno = stream_errno; } static struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream) { time_t oldest = ioloop_time; struct multiplex_ochannel *channel = NULL; struct multiplex_ochannel **channelp; array_foreach_modifiable(&mstream->channels, channelp) if (*channelp != NULL && (*channelp)->last_sent <= oldest && (*channelp)->buf->used > 0) channel = *channelp; return channel; } static ssize_t o_stream_multiplex_sendv(struct multiplex_ostream *mstream) { struct multiplex_ochannel *channel; ssize_t ret = 0; if (mstream->bufsize <= mstream->wbuf->used + 5) return -2; while((channel = get_next_channel(mstream)) != NULL) { size_t tmp = mstream->bufsize - mstream->wbuf->used - 5; /* ensure it fits into 32 bit int */ size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used)); if (tmp == 0) break; uint32_t len = cpu32_to_be(amt); buffer_append(mstream->wbuf, &channel->cid, 1); buffer_append(mstream->wbuf, &len, 4); buffer_append(mstream->wbuf, channel->buf->data, amt); buffer_delete(channel->buf, 0, amt); channel->last_sent = ioloop_time; } if (mstream->wbuf->used > 0) { ret = o_stream_send(mstream->parent, mstream->wbuf->data, mstream->wbuf->used); if (ret < 0) { propagate_error(mstream, mstream->parent->stream_errno); return ret; } o_stream_flush(mstream->parent); buffer_delete(mstream->wbuf, 0, ret); } return ret; } static ssize_t o_stream_multiplex_ochannel_sendv(struct ostream_private *stream, const struct const_iovec *iov, unsigned int iov_count) { struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream; ssize_t ret; size_t total = 0; if (channel->mstream->bufsize <= channel->buf->used) return -2; for(unsigned int i=0; i < iov_count; i++) { /* copy data to buffer */ size_t tmp = channel->mstream->bufsize - channel->buf->used; if (tmp == 0) break; buffer_append(channel->buf, iov[i].iov_base, I_MIN(tmp, iov[i].iov_len)); total += I_MIN(tmp, iov[i].iov_len); } stream->ostream.offset += total; if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0) return ret; return total; } static void o_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent) { struct multiplex_ochannel *const *channelp; struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream; o_stream_flush(&channel->ostream.ostream); channel->closed = TRUE; if (close_parent) { array_foreach(&channel->mstream->channels, channelp) if (*channelp !=NULL && !(*channelp)->closed) return; o_stream_close(channel->mstream->parent); } } static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream) { struct multiplex_ochannel **channelp; /* can't do anything until they are all closed */ array_foreach_modifiable(&mstream->channels, channelp) if (*channelp != NULL) return; o_stream_flush(mstream->parent); o_stream_unref(&mstream->parent); array_free(&mstream->channels); buffer_free(&mstream->wbuf); i_free(mstream); } static void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream) { struct multiplex_ochannel **channelp; struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream; o_stream_multiplex_ochannel_close(stream, TRUE); if (channel->buf != NULL) buffer_free(&channel->buf); /* delete the channel */ array_foreach_modifiable(&channel->mstream->channels, channelp) { if (*channelp != NULL && (*channelp)->cid == channel->cid) { *channelp = NULL; break; } } o_stream_multiplex_try_destroy(channel->mstream); } static struct ostream * o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid) { struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1); channel->cid = cid; channel->buf = buffer_create_dynamic(default_pool, 256); channel->mstream = mstream; channel->ostream.sendv = o_stream_multiplex_ochannel_sendv; channel->ostream.iostream.close = o_stream_multiplex_ochannel_close; channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy; channel->ostream.fd = o_stream_get_fd(mstream->parent); array_append(&channel->mstream->channels, &channel, 1); return o_stream_create(&channel->ostream, mstream->parent, mstream->bufsize); } struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid) { struct multiplex_ochannel *chan = (struct multiplex_ochannel *)stream->real_stream; i_assert(get_channel(chan->mstream, cid) == NULL); return o_stream_add_channel_real(chan->mstream, cid); } struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize) { struct multiplex_ostream *mstream; mstream = i_new(struct multiplex_ostream, 1); mstream->parent = parent; mstream->bufsize = bufsize; mstream->wbuf = buffer_create_dynamic(default_pool, 256); i_array_init(&mstream->channels, 8); o_stream_ref(parent); return o_stream_add_channel_real(mstream, 0); } uint8_t o_stream_multiplex_get_channel_id(struct ostream *stream) { struct multiplex_ochannel *channel = (struct multiplex_ochannel *)stream->real_stream; return channel->cid; }