Mercurial > dovecot > core-2.2
view src/lib/istream-chain.c @ 22664:fea53c2725c0
director: Fix director_max_parallel_moves/kicks type
Should be uint, not time.
author | Timo Sirainen <timo.sirainen@dovecot.fi> |
---|---|
date | Thu, 09 Nov 2017 12:24:16 +0200 |
parents | 2e2563132d5f |
children | cb108f786fb4 |
line wrap: on
line source
/* Copyright (c) 2003-2017 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "llist.h" #include "istream-private.h" #include "istream-chain.h" struct chain_istream; struct istream_chain_link { struct istream_chain_link *prev, *next; struct istream *stream; bool eof; }; struct istream_chain { struct istream_chain_link *head, *tail; struct chain_istream *stream; }; struct chain_istream { struct istream_private istream; /* how much of the previous link's stream still exists at the beginning of our buffer. skipping through this should point to the beginning of the current link's stream. */ size_t prev_stream_left; size_t prev_skip; bool have_explicit_max_buffer_size; struct istream_chain chain; }; static void ATTR_NULL(2) i_stream_chain_append_internal(struct istream_chain *chain, struct istream *stream) { struct istream_chain_link *link; if (stream == NULL && chain->tail != NULL && chain->tail->stream == NULL) return; link = i_new(struct istream_chain_link, 1); link->stream = stream; link->eof = stream == NULL; if (stream != NULL) i_stream_ref(stream); if (chain->head == NULL && stream != NULL) { struct chain_istream *cstream = (struct chain_istream *)chain->stream; if (cstream->have_explicit_max_buffer_size) { i_stream_set_max_buffer_size(stream, chain->stream->istream.max_buffer_size); } else { size_t max_size = i_stream_get_max_buffer_size(stream); if (cstream->istream.max_buffer_size < max_size) cstream->istream.max_buffer_size = max_size; } } DLLIST2_APPEND(&chain->head, &chain->tail, link); /* if io_add_istream() has been added to this chain stream, notify the callback that we have more data available. */ if (stream != NULL) i_stream_set_input_pending(stream, TRUE); } void i_stream_chain_append(struct istream_chain *chain, struct istream *stream) { i_stream_chain_append_internal(chain, stream); } void i_stream_chain_append_eof(struct istream_chain *chain) { i_stream_chain_append_internal(chain, NULL); } static void i_stream_chain_set_max_buffer_size(struct iostream_private *stream, size_t max_size) { struct chain_istream *cstream = (struct chain_istream *)stream; struct istream_chain_link *link = cstream->chain.head; cstream->have_explicit_max_buffer_size = TRUE; cstream->istream.max_buffer_size = max_size; while (link != NULL) { if (link->stream != NULL) i_stream_set_max_buffer_size(link->stream, max_size); link = link->next; } } static void i_stream_chain_destroy(struct iostream_private *stream) { struct chain_istream *cstream = (struct chain_istream *)stream; struct istream_chain_link *link = cstream->chain.head; while (link != NULL) { struct istream_chain_link *next = link->next; if (link->stream != NULL) i_stream_unref(&link->stream); i_free(link); link = next; } i_free(cstream->istream.w_buffer); } static void i_stream_chain_read_next(struct chain_istream *cstream) { struct istream_chain_link *link = cstream->chain.head; struct istream *prev_input; const unsigned char *data; size_t data_size, cur_data_pos; i_assert(link != NULL && link->stream != NULL); i_assert(link->stream->eof); prev_input = link->stream; data = i_stream_get_data(prev_input, &data_size); DLLIST2_REMOVE(&cstream->chain.head, &cstream->chain.tail, link); i_free(link); /* a) we have more streams, b) we have EOF, c) we need to wait for more streams */ link = cstream->chain.head; if (link != NULL && link->stream != NULL) i_stream_seek(link->stream, 0); if (cstream->prev_stream_left > 0) { /* we've already buffered some of the prev_input. continue appending the rest to it. if it's already at EOF, there's nothing more to append. */ cur_data_pos = cstream->istream.pos - (cstream->istream.skip + cstream->prev_stream_left); i_assert(cur_data_pos <= data_size); data += cur_data_pos; data_size -= cur_data_pos; /* the stream has now become "previous", so its contents in buffer are now part of prev_stream_left. */ cstream->prev_stream_left += cur_data_pos; } else { cstream->istream.pos = 0; cstream->istream.skip = 0; cstream->prev_stream_left = 0; } if (data_size > 0) { memcpy(i_stream_alloc(&cstream->istream, data_size), data, data_size); cstream->istream.pos += data_size; cstream->prev_stream_left += data_size; } i_stream_skip(prev_input, i_stream_get_data_size(prev_input)); i_stream_unref(&prev_input); } static bool i_stream_chain_skip(struct chain_istream *cstream) { struct istream_private *stream = &cstream->istream; struct istream_chain_link *link = cstream->chain.head; size_t bytes_skipped; i_assert(stream->skip >= cstream->prev_skip); bytes_skipped = stream->skip - cstream->prev_skip; if (cstream->prev_stream_left == 0) { /* no need to worry about buffers, skip everything */ } else if (bytes_skipped < cstream->prev_stream_left) { /* we're still skipping inside buffer */ cstream->prev_stream_left -= bytes_skipped; bytes_skipped = 0; } else { /* done with the buffer */ bytes_skipped -= cstream->prev_stream_left; cstream->prev_stream_left = 0; } stream->pos -= bytes_skipped; stream->skip -= bytes_skipped; stream->buffer += bytes_skipped; cstream->prev_skip = stream->skip; if (link == NULL || link->eof) { i_assert(bytes_skipped == 0); return FALSE; } i_stream_skip(link->stream, bytes_skipped); return TRUE; } static ssize_t i_stream_chain_read(struct istream_private *stream) { struct chain_istream *cstream = (struct chain_istream *)stream; struct istream_chain_link *link = cstream->chain.head; const unsigned char *data; size_t data_size, cur_data_pos, new_pos; size_t new_bytes_count; ssize_t ret; if (link != NULL && link->eof) { stream->istream.eof = TRUE; return -1; } if (!i_stream_chain_skip(cstream)) return 0; i_assert(link != NULL); i_assert(stream->pos >= stream->skip + cstream->prev_stream_left); cur_data_pos = stream->pos - (stream->skip + cstream->prev_stream_left); data = i_stream_get_data(link->stream, &data_size); if (data_size > cur_data_pos) ret = 0; else { /* need to read more */ i_assert(cur_data_pos == data_size); ret = i_stream_read(link->stream); if (ret == -2 || ret == 0) return ret; if (ret == -1) { if (link->stream->stream_errno != 0) { io_stream_set_error(&stream->iostream, "read(%s) failed: %s", i_stream_get_name(link->stream), i_stream_get_error(link->stream)); stream->istream.stream_errno = link->stream->stream_errno; return -1; } /* EOF of this stream, go to next stream */ i_stream_chain_read_next(cstream); cstream->prev_skip = stream->skip; return i_stream_chain_read(stream); } /* we read something */ data = i_stream_get_data(link->stream, &data_size); } if (cstream->prev_stream_left == 0) { /* we can point directly to the current stream's buffers */ stream->buffer = data; stream->pos -= stream->skip; stream->skip = 0; new_pos = data_size; } else if (data_size == cur_data_pos) { /* nothing new read */ i_assert(ret == 0 || ret == -1); stream->buffer = stream->w_buffer; new_pos = stream->pos; } else { /* we still have some of the previous stream left. merge the new data with it. */ i_assert(data_size > cur_data_pos); new_bytes_count = data_size - cur_data_pos; memcpy(i_stream_alloc(stream, new_bytes_count), data + cur_data_pos, new_bytes_count); stream->buffer = stream->w_buffer; new_pos = stream->pos + new_bytes_count; } ret = new_pos > stream->pos ? (ssize_t)(new_pos - stream->pos) : (ret == 0 ? 0 : -1); stream->pos = new_pos; cstream->prev_skip = stream->skip; return ret; } static void i_stream_chain_close(struct iostream_private *stream, bool close_parent) { struct chain_istream *cstream = (struct chain_istream *)stream; /* seek to the correct position in parent stream in case it didn't end with EOF */ (void)i_stream_chain_skip(cstream); if (close_parent) { struct istream_chain_link *link = cstream->chain.head; while (link != NULL) { i_stream_close(link->stream); link = link->next; } } } struct istream *i_stream_create_chain(struct istream_chain **chain_r) { struct chain_istream *cstream; cstream = i_new(struct chain_istream, 1); cstream->chain.stream = cstream; cstream->istream.iostream.close = i_stream_chain_close; cstream->istream.iostream.destroy = i_stream_chain_destroy; cstream->istream.iostream.set_max_buffer_size = i_stream_chain_set_max_buffer_size; cstream->istream.read = i_stream_chain_read; cstream->istream.istream.readable_fd = FALSE; cstream->istream.istream.blocking = FALSE; cstream->istream.istream.seekable = FALSE; *chain_r = &cstream->chain; return i_stream_create(&cstream->istream, NULL, -1); }