changeset 22607:708b4dda62dc

lib: Add istream-try This can be used to automatically detect the underlying istream format from a given list of choices.
author Timo Sirainen <timo.sirainen@dovecot.fi>
date Fri, 06 Oct 2017 18:31:12 +0300
parents e154812ec781
children 3cdf8b140c3c
files src/lib/Makefile.am src/lib/istream-try.c src/lib/istream-try.h src/lib/test-istream-try.c src/lib/test-lib.c src/lib/test-lib.h
diffstat 6 files changed, 293 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib/Makefile.am	Mon Oct 09 18:31:52 2017 +0300
+++ b/src/lib/Makefile.am	Fri Oct 06 18:31:12 2017 +0300
@@ -77,6 +77,7 @@
 	istream-seekable.c \
 	istream-sized.c \
 	istream-tee.c \
+	istream-try.c \
 	istream-timeout.c \
 	istream-unix.c \
 	ioloop.c \
@@ -228,6 +229,7 @@
 	istream-seekable.h \
 	istream-sized.h \
 	istream-tee.h \
+	istream-try.h \
 	istream-timeout.h \
 	istream-unix.h \
 	ioloop.h \
@@ -347,6 +349,7 @@
 	test-istream-multiplex.c \
 	test-istream-seekable.c \
 	test-istream-tee.c \
+	test-istream-try.c \
 	test-istream-unix.c \
 	test-json-parser.c \
 	test-json-tree.c \
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/istream-try.c	Fri Oct 06 18:31:12 2017 +0300
@@ -0,0 +1,144 @@
+/* Copyright (c) 2013-2017 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "istream-private.h"
+#include "istream-try.h"
+
+struct try_istream {
+	struct istream_private istream;
+
+	unsigned int try_input_count;
+	struct istream **try_input;
+	unsigned int try_idx;
+
+	struct istream *final_input;
+};
+
+static void i_stream_unref_try_inputs(struct try_istream *tstream)
+{
+	for (unsigned int i = 0; i < tstream->try_input_count; i++) {
+		if (tstream->try_input[i] != NULL)
+			i_stream_unref(&tstream->try_input[i]);
+	}
+	tstream->try_input_count = 0;
+	i_free(tstream->try_input);
+}
+
+static void i_stream_try_close(struct iostream_private *stream,
+			       bool close_parent)
+{
+	struct try_istream *tstream = (struct try_istream *)stream;
+
+	if (close_parent) {
+		if (tstream->istream.parent != NULL)
+			i_stream_close(tstream->istream.parent);
+		for (unsigned int i = 0; i < tstream->try_input_count; i++) {
+			if (tstream->try_input[i] != NULL)
+				i_stream_close(tstream->try_input[i]);
+		}
+	}
+	i_stream_unref_try_inputs(tstream);
+}
+
+static bool i_stream_try_is_buffer_full(struct istream *try_input)
+{
+	/* See if one of the parent istreams have their buffer full.
+	   This is mainly intended to check with istream-tee whether its
+	   parent is full. That means that the try_input has already seen
+	   a full buffer of input, but it hasn't decided to return anything
+	   yet. But it also hasn't failed, so we'll assume that the input is
+	   correct for it and it simply needs a lot more input before it can
+	   return anything (e.g. istream-bzlib). */
+	while (try_input->real_stream->parent != NULL) {
+		try_input = try_input->real_stream->parent;
+		if (try_input->real_stream->pos == try_input->real_stream->buffer_size &&
+		    try_input->real_stream->buffer_size > 0)
+			return TRUE;
+	}
+	return FALSE;
+}
+
+static int i_stream_try_detect(struct try_istream *tstream)
+{
+	int ret;
+
+	for (; tstream->try_idx < tstream->try_input_count; tstream->try_idx++) {
+		struct istream *try_input =
+			tstream->try_input[tstream->try_idx];
+
+		ret = i_stream_read(try_input);
+		if (ret == 0 && i_stream_try_is_buffer_full(try_input))
+			ret = 1;
+		if (ret > 0) {
+			i_stream_init_parent(&tstream->istream, try_input);
+			i_stream_unref_try_inputs(tstream);
+			return 1;
+		}
+		if (ret == 0)
+			return 0;
+		if (try_input->stream_errno != EINVAL) {
+			tstream->istream.istream.stream_errno =
+				try_input->stream_errno;
+			io_stream_set_error(&tstream->istream.iostream,
+				"Unexpected error while detecting stream format: %s",
+				i_stream_get_error(try_input));
+			return -1;
+		}
+	}
+
+	/* All streams failed with EINVAL. */
+	io_stream_set_error(&tstream->istream.iostream,
+			    "Failed to detect stream format");
+	tstream->istream.istream.stream_errno = EINVAL;
+	return -1;
+}
+
+static ssize_t
+i_stream_try_read(struct istream_private *stream)
+{
+	struct try_istream *tstream = (struct try_istream *)stream;
+	int ret;
+
+	if (stream->parent == NULL) {
+		if ((ret = i_stream_try_detect(tstream)) <= 0)
+			return ret;
+	}
+
+	i_stream_seek(stream->parent, stream->parent_start_offset +
+		      stream->istream.v_offset);
+	return i_stream_read_copy_from_parent(&stream->istream);
+}
+
+struct istream *istream_try_create(struct istream *const input[])
+{
+	struct try_istream *tstream;
+	unsigned int count;
+	size_t max_buffer_size = I_STREAM_MIN_SIZE;
+	bool blocking = TRUE, seekable = TRUE;
+
+	for (count = 0; input[count] != NULL; count++) {
+		max_buffer_size = I_MAX(max_buffer_size,
+					i_stream_get_max_buffer_size(input[count]));
+		if (!input[count]->blocking)
+			blocking = FALSE;
+		if (!input[count]->seekable)
+			seekable = FALSE;
+		i_stream_ref(input[count]);
+	}
+	i_assert(count != 0);
+
+	tstream = i_new(struct try_istream, 1);
+	tstream->try_input_count = count;
+	tstream->try_input = p_memdup(default_pool, input,
+				      sizeof(*input) * count);
+
+	tstream->istream.iostream.close = i_stream_try_close;
+
+	tstream->istream.max_buffer_size = max_buffer_size;
+	tstream->istream.read = i_stream_try_read;
+
+	tstream->istream.istream.readable_fd = FALSE;
+	tstream->istream.istream.blocking = blocking;
+	tstream->istream.istream.seekable = seekable;
+	return i_stream_create(&tstream->istream, NULL, -1);
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/istream-try.h	Fri Oct 06 18:31:12 2017 +0300
@@ -0,0 +1,14 @@
+#ifndef ISTREAM_TRY_H
+#define ISTREAM_TRY_H
+
+/* Read from the first input stream that doesn't fail with EINVAL. If any of
+   the streams fail with non-EINVAL, it's treated as a fatal failure and the
+   error is immediately returned. If a stream returns 0, more data is waited
+   for before continuing to the next stream. This allows the last stream to
+   be a fallback stream that always succeeds.
+
+   Once the stream is detected, all the other streams are unreferenced.
+   The streams should usually be children of the same parent tee-istream. */
+struct istream *istream_try_create(struct istream *const input[]);
+
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/test-istream-try.c	Fri Oct 06 18:31:12 2017 +0300
@@ -0,0 +1,130 @@
+/* Copyright (c) 2009-2017 Dovecot authors, see the included COPYING file */
+
+#include "test-lib.h"
+#include "istream.h"
+#include "istream-try.h"
+
+void test_istream_try(void)
+{
+	bool finished = FALSE;
+
+	test_begin("istream try");
+	for (unsigned int test = 0; test <= 10; test++) {
+		struct istream *test_inputs[3], *try_input;
+
+		test_inputs[0] = test_istream_create("1");
+		test_inputs[1] = test_istream_create("2");
+		test_inputs[2] = NULL;
+		test_istream_set_size(test_inputs[0], 0);
+		test_istream_set_size(test_inputs[1], 0);
+		try_input = istream_try_create(test_inputs);
+
+		/* nonblocking read */
+		test_assert_idx(i_stream_read(try_input) == 0, test);
+
+		switch (test) {
+		case 0:
+			/* stream 0 is available */
+			test_istream_set_size(test_inputs[0], 1);
+			test_assert_idx(i_stream_read(try_input) == 1, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 1, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+			break;
+		case 1:
+			/* stream 1 is available, but not used before 0 */
+			test_istream_set_size(test_inputs[1], 1);
+			test_assert_idx(i_stream_read(try_input) == 0, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+			/* continue failing stream 0 -> 1 is available */
+			test_inputs[0]->stream_errno = EINVAL;
+			test_assert_idx(i_stream_read(try_input) == 1, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 1, test);
+			break;
+		case 2:
+			/* both streams are available - stream 0 is read */
+			test_istream_set_size(test_inputs[0], 1);
+			test_istream_set_size(test_inputs[1], 1);
+			test_assert_idx(i_stream_read(try_input) == 1, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 1, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+			break;
+		case 3:
+			/* stream 0 fails */
+			test_inputs[0]->stream_errno = EINVAL;
+			test_assert_idx(i_stream_read(try_input) == 0, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+			/* continue making stream 1 available */
+			test_istream_set_size(test_inputs[1], 1);
+			test_assert_idx(i_stream_read(try_input) == 1, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 1, test);
+			break;
+		case 4:
+			/* stream 1 fails */
+			test_inputs[1]->stream_errno = EINVAL;
+			test_assert_idx(i_stream_read(try_input) == 0, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+			break;
+		case 5:
+			/* stream 0 fails, stream 1 is available */
+			test_inputs[0]->stream_errno = EINVAL;
+			test_istream_set_size(test_inputs[1], 1);
+			test_assert_idx(i_stream_read(try_input) == 1, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 1, test);
+			break;
+		case 6:
+			/* stream 0 is available, stream 1 fails */
+			test_inputs[1]->stream_errno = EINVAL;
+			test_istream_set_size(test_inputs[0], 1);
+			test_assert_idx(i_stream_read(try_input) == 1, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 1, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+			break;
+		case 7:
+			/* both streams fail */
+			test_inputs[0]->stream_errno = EINVAL;
+			test_inputs[1]->stream_errno = EINVAL;
+			test_assert_idx(i_stream_read(try_input) == -1, test);
+			test_assert_idx(try_input->stream_errno == EINVAL, test);
+			break;
+		case 8:
+			/* stream 0 fails with EINVAL, stream 1 with EIO */
+			test_inputs[0]->stream_errno = EINVAL;
+			test_inputs[1]->stream_errno = EIO;
+			test_assert_idx(i_stream_read(try_input) == -1, test);
+			test_assert_idx(try_input->stream_errno == EIO, test);
+			break;
+		case 9:
+			/* stream 0 fails with EIO, stream 1 with EINVAL */
+			test_inputs[0]->stream_errno = EIO;
+			test_inputs[1]->stream_errno = EINVAL;
+			test_assert_idx(i_stream_read(try_input) == -1, test);
+			test_assert_idx(try_input->stream_errno == EIO, test);
+			break;
+		case 10:
+			/* stream 0 fails with EIO, stream 1 would work.. */
+			test_inputs[0]->stream_errno = EIO;
+			test_istream_set_size(test_inputs[1], 1);
+			test_assert_idx(i_stream_read(try_input) == -1, test);
+			test_assert_idx(try_input->stream_errno == EIO, test);
+			test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+
+			finished = TRUE;
+			break;
+		}
+
+		test_assert_idx(test_inputs[0]->v_offset == 0, test);
+		test_assert_idx(test_inputs[1]->v_offset == 0, test);
+
+		i_stream_unref(&test_inputs[0]);
+		i_stream_unref(&test_inputs[1]);
+		i_stream_unref(&try_input);
+	}
+	i_assert(finished);
+	test_end();
+}
--- a/src/lib/test-lib.c	Mon Oct 09 18:31:52 2017 +0300
+++ b/src/lib/test-lib.c	Fri Oct 06 18:31:12 2017 +0300
@@ -36,6 +36,7 @@
 		test_istream_multiplex,
 		test_istream_seekable,
 		test_istream_tee,
+		test_istream_try,
 		test_istream_unix,
 		test_json_parser,
 		test_json_tree,
--- a/src/lib/test-lib.h	Mon Oct 09 18:31:52 2017 +0300
+++ b/src/lib/test-lib.h	Fri Oct 06 18:31:12 2017 +0300
@@ -37,6 +37,7 @@
 void test_istream_multiplex(void);
 void test_istream_seekable(void);
 void test_istream_tee(void);
+void test_istream_try(void);
 void test_istream_unix(void);
 void test_json_parser(void);
 void test_json_tree(void);