Mercurial > dovecot > core-2.2
comparison src/lib/ostream-multiplex.c @ 22553:7db517071db5
lib: Add multiplex stream support
This allows having multiple channels of data in single stream.
author | Aki Tuomi <aki.tuomi@dovecot.fi> |
---|---|
date | Tue, 22 Aug 2017 10:14:22 +0300 |
parents | |
children | e1ad624fc2ad |
comparison
equal
deleted
inserted
replaced
22552:7b8e95de2bff | 22553:7db517071db5 |
---|---|
1 /* Copyright (c) 2017 Dovecot authors, see the included COPYING file */ | |
2 | |
3 #include "lib.h" | |
4 #include "ioloop.h" | |
5 #include "array.h" | |
6 #include "ostream-private.h" | |
7 #include "ostream-multiplex.h" | |
8 | |
9 /* all multiplex packets are [1 byte cid][4 byte length][data] */ | |
10 | |
11 struct multiplex_ostream; | |
12 | |
13 struct multiplex_ochannel { | |
14 struct ostream_private ostream; | |
15 struct multiplex_ostream *mstream; | |
16 uint8_t cid; | |
17 buffer_t *buf; | |
18 time_t last_sent; | |
19 bool closed:1; | |
20 }; | |
21 | |
22 struct multiplex_ostream { | |
23 struct ostream *parent; | |
24 | |
25 /* channel 0 is main channel */ | |
26 uint8_t cur_channel; | |
27 unsigned int remain; | |
28 buffer_t *wbuf; | |
29 size_t bufsize; | |
30 ARRAY(struct multiplex_ochannel *) channels; | |
31 | |
32 bool destroyed:1; | |
33 }; | |
34 | |
35 static struct multiplex_ochannel * | |
36 get_channel(struct multiplex_ostream *mstream, uint8_t cid) | |
37 { | |
38 struct multiplex_ochannel **channelp; | |
39 i_assert(mstream != NULL); | |
40 array_foreach_modifiable(&mstream->channels, channelp) { | |
41 if (*channelp != NULL && (*channelp)->cid == cid) | |
42 return *channelp; | |
43 } | |
44 return NULL; | |
45 } | |
46 | |
47 static void propagate_error(struct multiplex_ostream *mstream, int stream_errno) | |
48 { | |
49 struct multiplex_ochannel **channelp; | |
50 array_foreach_modifiable(&mstream->channels, channelp) | |
51 if (*channelp != NULL) | |
52 (*channelp)->ostream.ostream.stream_errno = stream_errno; | |
53 } | |
54 | |
55 static struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mstream) | |
56 { | |
57 time_t oldest = ioloop_time; | |
58 struct multiplex_ochannel *channel = NULL; | |
59 struct multiplex_ochannel **channelp; | |
60 array_foreach_modifiable(&mstream->channels, channelp) | |
61 if (*channelp != NULL && (*channelp)->last_sent <= oldest && | |
62 (*channelp)->buf->used > 0) | |
63 channel = *channelp; | |
64 return channel; | |
65 } | |
66 | |
67 static ssize_t | |
68 o_stream_multiplex_sendv(struct multiplex_ostream *mstream) | |
69 { | |
70 struct multiplex_ochannel *channel; | |
71 ssize_t ret = 0; | |
72 if (mstream->bufsize <= mstream->wbuf->used + 5) | |
73 return -2; | |
74 | |
75 while((channel = get_next_channel(mstream)) != NULL) { | |
76 size_t tmp = mstream->bufsize - mstream->wbuf->used - 5; | |
77 /* ensure it fits into 32 bit int */ | |
78 size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used)); | |
79 if (tmp == 0) | |
80 break; | |
81 uint32_t len = cpu32_to_be(amt); | |
82 buffer_append(mstream->wbuf, &channel->cid, 1); | |
83 buffer_append(mstream->wbuf, &len, 4); | |
84 buffer_append(mstream->wbuf, channel->buf->data, amt); | |
85 buffer_delete(channel->buf, 0, amt); | |
86 channel->last_sent = ioloop_time; | |
87 } | |
88 | |
89 if (mstream->wbuf->used > 0) { | |
90 ret = o_stream_send(mstream->parent, mstream->wbuf->data, | |
91 mstream->wbuf->used); | |
92 if (ret < 0) { | |
93 propagate_error(mstream, mstream->parent->stream_errno); | |
94 return ret; | |
95 } | |
96 o_stream_flush(mstream->parent); | |
97 buffer_delete(mstream->wbuf, 0, ret); | |
98 } | |
99 return ret; | |
100 } | |
101 | |
102 static ssize_t | |
103 o_stream_multiplex_ochannel_sendv(struct ostream_private *stream, | |
104 const struct const_iovec *iov, unsigned int iov_count) | |
105 { | |
106 struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream; | |
107 ssize_t ret; | |
108 size_t total = 0; | |
109 if (channel->mstream->bufsize <= channel->buf->used) | |
110 return -2; | |
111 | |
112 for(unsigned int i=0; i < iov_count; i++) { | |
113 /* copy data to buffer */ | |
114 size_t tmp = channel->mstream->bufsize - channel->buf->used; | |
115 if (tmp == 0) | |
116 break; | |
117 buffer_append(channel->buf, iov[i].iov_base, | |
118 I_MIN(tmp, iov[i].iov_len)); | |
119 total += I_MIN(tmp, iov[i].iov_len); | |
120 } | |
121 | |
122 stream->ostream.offset += total; | |
123 | |
124 if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0) | |
125 return ret; | |
126 | |
127 return total; | |
128 } | |
129 | |
130 static void | |
131 o_stream_multiplex_ochannel_close(struct iostream_private *stream, bool close_parent) | |
132 { | |
133 struct multiplex_ochannel *const *channelp; | |
134 struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream; | |
135 o_stream_flush(&channel->ostream.ostream); | |
136 | |
137 channel->closed = TRUE; | |
138 if (close_parent) { | |
139 array_foreach(&channel->mstream->channels, channelp) | |
140 if (*channelp !=NULL && !(*channelp)->closed) | |
141 return; | |
142 o_stream_close(channel->mstream->parent); | |
143 } | |
144 } | |
145 | |
146 static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream) | |
147 { | |
148 struct multiplex_ochannel **channelp; | |
149 /* can't do anything until they are all closed */ | |
150 array_foreach_modifiable(&mstream->channels, channelp) | |
151 if (*channelp != NULL) | |
152 return; | |
153 o_stream_flush(mstream->parent); | |
154 o_stream_unref(&mstream->parent); | |
155 array_free(&mstream->channels); | |
156 buffer_free(&mstream->wbuf); | |
157 i_free(mstream); | |
158 } | |
159 | |
160 static void o_stream_multiplex_ochannel_destroy(struct iostream_private *stream) | |
161 { | |
162 struct multiplex_ochannel **channelp; | |
163 struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream; | |
164 o_stream_multiplex_ochannel_close(stream, TRUE); | |
165 if (channel->buf != NULL) | |
166 buffer_free(&channel->buf); | |
167 /* delete the channel */ | |
168 array_foreach_modifiable(&channel->mstream->channels, channelp) { | |
169 if (*channelp != NULL && (*channelp)->cid == channel->cid) { | |
170 *channelp = NULL; | |
171 break; | |
172 } | |
173 } | |
174 o_stream_multiplex_try_destroy(channel->mstream); | |
175 } | |
176 | |
177 static struct ostream * | |
178 o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid) | |
179 { | |
180 struct multiplex_ochannel *channel = i_new(struct multiplex_ochannel, 1); | |
181 channel->cid = cid; | |
182 channel->buf = buffer_create_dynamic(default_pool, 256); | |
183 channel->mstream = mstream; | |
184 channel->ostream.sendv = o_stream_multiplex_ochannel_sendv; | |
185 channel->ostream.iostream.close = o_stream_multiplex_ochannel_close; | |
186 channel->ostream.iostream.destroy = o_stream_multiplex_ochannel_destroy; | |
187 if (cid == 0) | |
188 channel->ostream.fd = o_stream_get_fd(mstream->parent); | |
189 else | |
190 channel->ostream.fd = -1; | |
191 array_append(&channel->mstream->channels, &channel, 1); | |
192 | |
193 return o_stream_create(&channel->ostream, NULL, mstream->bufsize); | |
194 } | |
195 | |
196 struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid) | |
197 { | |
198 struct multiplex_ochannel *chan = | |
199 (struct multiplex_ochannel *)stream->real_stream; | |
200 i_assert(get_channel(chan->mstream, cid) == NULL); | |
201 | |
202 return o_stream_add_channel_real(chan->mstream, cid); | |
203 } | |
204 | |
205 struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize) | |
206 { | |
207 struct multiplex_ostream *mstream; | |
208 | |
209 mstream = i_new(struct multiplex_ostream, 1); | |
210 mstream->parent = parent; | |
211 mstream->bufsize = bufsize; | |
212 mstream->wbuf = buffer_create_dynamic(default_pool, 256); | |
213 i_array_init(&mstream->channels, 8); | |
214 o_stream_ref(parent); | |
215 | |
216 return o_stream_add_channel_real(mstream, 0); | |
217 } | |
218 | |
219 uint8_t o_stream_multiplex_get_channel_id(struct ostream *stream) | |
220 { | |
221 struct multiplex_ochannel *channel = | |
222 (struct multiplex_ochannel *)stream->real_stream; | |
223 return channel->cid; | |
224 } |