comparison src/lib/ostream-multiplex.c @ 22553:7db517071db5

lib: Add multiplex stream support This allows having multiple channels of data in single stream.
author Aki Tuomi <aki.tuomi@dovecot.fi>
date Tue, 22 Aug 2017 10:14:22 +0300
parents
children e1ad624fc2ad
comparison
equal deleted inserted replaced
22552:7b8e95de2bff 22553:7db517071db5
1 /* Copyright (c) 2017 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "ioloop.h"
5 #include "array.h"
6 #include "ostream-private.h"
7 #include "ostream-multiplex.h"
8
9 /* all multiplex packets are [1 byte cid][4 byte length][data] */
10
11 struct multiplex_ostream;
12
13 struct multiplex_ochannel {
14 struct ostream_private ostream;
15 struct multiplex_ostream *mstream;
16 uint8_t cid;
17 buffer_t *buf;
18 time_t last_sent;
19 bool closed:1;
20 };
21
22 struct multiplex_ostream {
23 struct ostream *parent;
24
25 /* channel 0 is main channel */
26 uint8_t cur_channel;
27 unsigned int remain;
28 buffer_t *wbuf;
29 size_t bufsize;
30 ARRAY(struct multiplex_ochannel *) channels;
31
32 bool destroyed:1;
33 };
34
35 static struct multiplex_ochannel *
36 get_channel(struct multiplex_ostream *mstream, uint8_t cid)
37 {
38 struct multiplex_ochannel **channelp;
39 i_assert(mstream != NULL);
40 array_foreach_modifiable(&mstream->channels, channelp) {
41 if (*channelp != NULL && (*channelp)->cid == cid)
42 return *channelp;
43 }
44 return NULL;
45 }
46
47 static void propagate_error(struct multiplex_ostream *mstream, int stream_errno)
48 {
49 struct multiplex_ochannel **channelp;
50 array_foreach_modifiable(&mstream->channels, channelp)
51 if (*channelp != NULL)
52 (*channelp)->ostream.ostream.stream_errno = stream_errno;
53 }
54
55 static struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream)
56 {
57 time_t oldest = ioloop_time;
58 struct multiplex_ochannel *channel = NULL;
59 struct multiplex_ochannel **channelp;
60 array_foreach_modifiable(&mstream->channels, channelp)
61 if (*channelp != NULL && (*channelp)->last_sent <= oldest &&
62 (*channelp)->buf->used > 0)
63 channel = *channelp;
64 return channel;
65 }
66
67 static ssize_t
68 o_stream_multiplex_sendv(struct multiplex_ostream *mstream)
69 {
70 struct multiplex_ochannel *channel;
71 ssize_t ret = 0;
72 if (mstream->bufsize <= mstream->wbuf->used + 5)
73 return -2;
74
75 while((channel = get_next_channel(mstream)) != NULL) {
76 size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
77 /* ensure it fits into 32 bit int */
78 size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
79 if (tmp == 0)
80 break;
81 uint32_t len = cpu32_to_be(amt);
82 buffer_append(mstream->wbuf, &channel->cid, 1);
83 buffer_append(mstream->wbuf, &len, 4);
84 buffer_append(mstream->wbuf, channel->buf->data, amt);
85 buffer_delete(channel->buf, 0, amt);
86 channel->last_sent = ioloop_time;
87 }
88
89 if (mstream->wbuf->used > 0) {
90 ret = o_stream_send(mstream->parent, mstream->wbuf->data,
91 mstream->wbuf->used);
92 if (ret < 0) {
93 propagate_error(mstream, mstream->parent->stream_errno);
94 return ret;
95 }
96 o_stream_flush(mstream->parent);
97 buffer_delete(mstream->wbuf, 0, ret);
98 }
99 return ret;
100 }
101
102 static ssize_t
103 o_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
104 const struct const_iovec *iov, unsigned int iov_count)
105 {
106 struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
107 ssize_t ret;
108 size_t total = 0;
109 if (channel->mstream->bufsize <= channel->buf->used)
110 return -2;
111
112 for(unsigned int i=0; i < iov_count; i++) {
113 /* copy data to buffer */
114 size_t tmp = channel->mstream->bufsize - channel->buf->used;
115 if (tmp == 0)
116 break;
117 buffer_append(channel->buf, iov[i].iov_base,
118 I_MIN(tmp, iov[i].iov_len));
119 total += I_MIN(tmp, iov[i].iov_len);
120 }
121
122 stream->ostream.offset += total;
123
124 if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
125 return ret;
126
127 return total;
128 }
129
130 static void
131 o_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent)
132 {
133 struct multiplex_ochannel *const *channelp;
134 struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
135 o_stream_flush(&channel->ostream.ostream);
136
137 channel->closed = TRUE;
138 if (close_parent) {
139 array_foreach(&channel->mstream->channels, channelp)
140 if (*channelp !=NULL && !(*channelp)->closed)
141 return;
142 o_stream_close(channel->mstream->parent);
143 }
144 }
145
146 static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream)
147 {
148 struct multiplex_ochannel **channelp;
149 /* can't do anything until they are all closed */
150 array_foreach_modifiable(&mstream->channels, channelp)
151 if (*channelp != NULL)
152 return;
153 o_stream_flush(mstream->parent);
154 o_stream_unref(&mstream->parent);
155 array_free(&mstream->channels);
156 buffer_free(&mstream->wbuf);
157 i_free(mstream);
158 }
159
160 static void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream)
161 {
162 struct multiplex_ochannel **channelp;
163 struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
164 o_stream_multiplex_ochannel_close(stream, TRUE);
165 if (channel->buf != NULL)
166 buffer_free(&channel->buf);
167 /* delete the channel */
168 array_foreach_modifiable(&channel->mstream->channels, channelp) {
169 if (*channelp != NULL && (*channelp)->cid == channel->cid) {
170 *channelp = NULL;
171 break;
172 }
173 }
174 o_stream_multiplex_try_destroy(channel->mstream);
175 }
176
177 static struct ostream *
178 o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
179 {
180 struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1);
181 channel->cid = cid;
182 channel->buf = buffer_create_dynamic(default_pool, 256);
183 channel->mstream = mstream;
184 channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
185 channel->ostream.iostream.close = o_stream_multiplex_ochannel_close;
186 channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy;
187 if (cid == 0)
188 channel->ostream.fd = o_stream_get_fd(mstream->parent);
189 else
190 channel->ostream.fd = -1;
191 array_append(&channel->mstream->channels, &channel, 1);
192
193 return o_stream_create(&channel->ostream, NULL, mstream->bufsize);
194 }
195
196 struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
197 {
198 struct multiplex_ochannel *chan =
199 (struct multiplex_ochannel *)stream->real_stream;
200 i_assert(get_channel(chan->mstream, cid) == NULL);
201
202 return o_stream_add_channel_real(chan->mstream, cid);
203 }
204
205 struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize)
206 {
207 struct multiplex_ostream *mstream;
208
209 mstream = i_new(struct multiplex_ostream, 1);
210 mstream->parent = parent;
211 mstream->bufsize = bufsize;
212 mstream->wbuf = buffer_create_dynamic(default_pool, 256);
213 i_array_init(&mstream->channels, 8);
214 o_stream_ref(parent);
215
216 return o_stream_add_channel_real(mstream, 0);
217 }
218
219 uint8_t o_stream_multiplex_get_channel_id(struct ostream *stream)
220 {
221 struct multiplex_ochannel *channel =
222 (struct multiplex_ochannel *)stream->real_stream;
223 return channel->cid;
224 }