changeset 13530:8c3c0e01e00d

Simplified creating filter ostreams.
author Timo Sirainen <tss@iki.fi>
date Wed, 21 Sep 2011 14:40:35 +0300
parents cf77e448295c
children 0fb296c11edc
files src/lib-fs/ostream-cmp.c src/lib-ssl-iostream/ostream-openssl.c src/lib/ostream-buffer.c src/lib/ostream-file.c src/lib/ostream-private.h src/lib/ostream.c src/plugins/zlib/ostream-bzlib.c src/plugins/zlib/ostream-zlib.c
diffstat 8 files changed, 224 insertions(+), 195 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib-fs/ostream-cmp.c	Wed Sep 21 13:56:13 2011 +0300
+++ b/src/lib-fs/ostream-cmp.c	Wed Sep 21 14:40:35 2011 +0300
@@ -9,41 +9,18 @@
 	struct ostream_private ostream;
 
 	struct istream *input;
-	struct ostream *output;
 	bool equals;
 };
 
-static void cstream_copy_error(struct cmp_ostream *cstream)
-{
-	struct ostream *src = cstream->output;
-	struct ostream *dest = &cstream->ostream.ostream;
-
-	dest->stream_errno = src->stream_errno;
-	dest->last_failed_errno = src->last_failed_errno;
-	dest->overflow = src->overflow;
-}
-
 static void o_stream_cmp_close(struct iostream_private *stream)
 {
 	struct cmp_ostream *cstream = (struct cmp_ostream *)stream;
 
-	if (cstream->output == NULL)
+	if (cstream->input == NULL)
 		return;
 
 	i_stream_unref(&cstream->input);
 	o_stream_flush(&cstream->ostream.ostream);
-	o_stream_unref(&cstream->output);
-}
-
-static int o_stream_cmp_flush(struct ostream_private *stream)
-{
-	struct cmp_ostream *cstream = (struct cmp_ostream *)stream;
-	int ret;
-
-	ret = o_stream_flush(cstream->output);
-	if (ret < 0)
-		cstream_copy_error(cstream);
-	return ret;
 }
 
 bool stream_cmp_block(struct istream *input,
@@ -82,8 +59,8 @@
 		}
 	}
 
-	if ((ret = o_stream_sendv(cstream->output, iov, iov_count)) < 0) {
-		cstream_copy_error(cstream);
+	if ((ret = o_stream_sendv(stream->parent, iov, iov_count)) < 0) {
+		o_stream_copy_error_from_parent(stream);
 		return -1;
 	}
 
@@ -98,15 +75,12 @@
 
 	cstream = i_new(struct cmp_ostream, 1);
 	cstream->ostream.sendv = o_stream_cmp_sendv;
-	cstream->ostream.flush = o_stream_cmp_flush;
 	cstream->ostream.iostream.close = o_stream_cmp_close;
 	cstream->input = input;
-	cstream->output = output;
 	cstream->equals = TRUE;
 	i_stream_ref(input);
-	o_stream_ref(output);
 
-	return o_stream_create(&cstream->ostream);
+	return o_stream_create(&cstream->ostream, output);
 }
 
 bool o_stream_cmp_equals(struct ostream *_output)
--- a/src/lib-ssl-iostream/ostream-openssl.c	Wed Sep 21 13:56:13 2011 +0300
+++ b/src/lib-ssl-iostream/ostream-openssl.c	Wed Sep 21 14:40:35 2011 +0300
@@ -253,5 +253,5 @@
 	o_stream_set_flush_callback(ssl_io->plain_output,
 				    plain_flush_callback, sstream);
 
-	return o_stream_create(&sstream->ostream);
+	return o_stream_create(&sstream->ostream, NULL);
 }
--- a/src/lib/ostream-buffer.c	Wed Sep 21 13:56:13 2011 +0300
+++ b/src/lib/ostream-buffer.c	Wed Sep 21 14:40:35 2011 +0300
@@ -59,5 +59,5 @@
 	bstream->ostream.write_at = o_stream_buffer_write_at;
 
 	bstream->buf = buf;
-	return o_stream_create(&bstream->ostream);
+	return o_stream_create(&bstream->ostream, NULL);
 }
--- a/src/lib/ostream-file.c	Wed Sep 21 13:56:13 2011 +0300
+++ b/src/lib/ostream-file.c	Wed Sep 21 14:40:35 2011 +0300
@@ -933,7 +933,7 @@
 
 	fstream = o_stream_create_fd_common(fd, autoclose_fd);
 	fstream->ostream.max_buffer_size = max_buffer_size;
-	ostream = o_stream_create(&fstream->ostream);
+	ostream = o_stream_create(&fstream->ostream, NULL);
 
 	offset = lseek(fd, 0, SEEK_CUR);
 	if (offset >= 0) {
@@ -969,7 +969,7 @@
 	fstream->real_offset = offset;
 	fstream->buffer_offset = offset;
 
-	ostream = o_stream_create(&fstream->ostream);
+	ostream = o_stream_create(&fstream->ostream, NULL);
 	ostream->offset = offset;
 	return ostream;
 }
--- a/src/lib/ostream-private.h	Wed Sep 21 13:56:13 2011 +0300
+++ b/src/lib/ostream-private.h	Wed Sep 21 14:40:35 2011 +0300
@@ -11,6 +11,9 @@
 /* methods: */
 	void (*cork)(struct ostream_private *stream, bool set);
 	int (*flush)(struct ostream_private *stream);
+	void (*set_flush_callback)(struct ostream_private *stream,
+				   stream_flush_callback_t *callback,
+				   void *context);
 	void (*flush_pending)(struct ostream_private *stream, bool set);
 	size_t (*get_used_size)(const struct ostream_private *stream);
 	int (*seek)(struct ostream_private *stream, uoff_t offset);
@@ -27,15 +30,20 @@
 	struct ostream ostream;
 	size_t max_buffer_size;
 
+	struct ostream *parent; /* for filter streams */
+
 	stream_flush_callback_t *callback;
 	void *context;
 
 	unsigned int corked:1;
 };
 
-struct ostream *o_stream_create(struct ostream_private *_stream);
+struct ostream *
+o_stream_create(struct ostream_private *_stream, struct ostream *parent);
 
 off_t io_stream_copy(struct ostream *outstream, struct istream *instream,
 		     size_t block_size);
 
+void o_stream_copy_error_from_parent(struct ostream_private *_stream);
+
 #endif
--- a/src/lib/ostream.c	Wed Sep 21 13:56:13 2011 +0300
+++ b/src/lib/ostream.c	Wed Sep 21 14:40:35 2011 +0300
@@ -12,8 +12,12 @@
 
 const char *o_stream_get_name(struct ostream *stream)
 {
-	return stream->real_stream->iostream.name == NULL ? "" :
-		stream->real_stream->iostream.name;
+	while (stream->real_stream->iostream.name == NULL) {
+		stream = stream->real_stream->parent;
+		if (stream == NULL)
+			return "";
+	}
+	return stream->real_stream->iostream.name;
 }
 
 void o_stream_destroy(struct ostream **stream)
@@ -46,26 +50,17 @@
 {
 	struct ostream_private *_stream = stream->real_stream;
 
-	_stream->callback = callback;
-	_stream->context = context;
+	_stream->set_flush_callback(_stream, callback, context);
 }
 
 void o_stream_unset_flush_callback(struct ostream *stream)
 {
-	struct ostream_private *_stream = stream->real_stream;
-
-	_stream->callback = NULL;
-	_stream->context = NULL;
+	o_stream_set_flush_callback(stream, NULL, NULL);
 }
 
 void o_stream_set_max_buffer_size(struct ostream *stream, size_t max_size)
 {
-	if (stream->real_stream->iostream.set_max_buffer_size != NULL) {
-		io_stream_set_max_buffer_size(&stream->real_stream->iostream,
-					      max_size);
-	} else {
-		stream->real_stream->max_buffer_size = max_size;
-	}
+	io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size);
 }
 
 void o_stream_cork(struct ostream *stream)
@@ -75,10 +70,7 @@
 	if (unlikely(stream->closed))
 		return;
 
-	if (_stream->cork != NULL)
-		_stream->cork(_stream, TRUE);
-	else
-		_stream->corked = TRUE;
+	_stream->cork(_stream, TRUE);
 }
 
 void o_stream_uncork(struct ostream *stream)
@@ -88,12 +80,7 @@
 	if (unlikely(stream->closed))
 		return;
 
-	if (_stream->cork != NULL)
-		_stream->cork(_stream, FALSE);
-	else {
-		_stream->corked = FALSE;
-		(void)o_stream_flush(stream);
-	}
+	_stream->cork(_stream, FALSE);
 }
 
 int o_stream_flush(struct ostream *stream)
@@ -105,11 +92,9 @@
 		return -1;
 
 	stream->stream_errno = 0;
-	if (_stream->flush != NULL) {
-		if (unlikely((ret = _stream->flush(_stream)) < 0)) {
-			i_assert(stream->stream_errno != 0);
-			stream->last_failed_errno = stream->stream_errno;
-		}
+	if (unlikely((ret = _stream->flush(_stream)) < 0)) {
+		i_assert(stream->stream_errno != 0);
+		stream->last_failed_errno = stream->stream_errno;
 	}
 	return ret;
 }
@@ -121,16 +106,14 @@
 	if (unlikely(stream->closed))
 		return;
 
-	if (_stream->flush_pending != NULL)
-		_stream->flush_pending(_stream, set);
+	_stream->flush_pending(_stream, set);
 }
 
 size_t o_stream_get_buffer_used_size(const struct ostream *stream)
 {
 	const struct ostream_private *_stream = stream->real_stream;
 
-	return _stream->get_used_size == NULL ? 0 :
-		_stream->get_used_size(_stream);
+	return _stream->get_used_size(_stream);
 }
 
 size_t o_stream_get_buffer_avail_size(const struct ostream *stream)
@@ -149,14 +132,9 @@
 		return -1;
 
 	stream->stream_errno = 0;
-	if (_stream->seek != NULL) {
-		if (unlikely(_stream->seek(_stream, offset) < 0)) {
-			i_assert(stream->stream_errno != 0);
-			stream->last_failed_errno = stream->stream_errno;
-		}
-	} else {
-		stream->stream_errno = EPIPE;
-		stream->last_failed_errno = EPIPE;
+	if (unlikely(_stream->seek(_stream, offset) < 0)) {
+		i_assert(stream->stream_errno != 0);
+		stream->last_failed_errno = stream->stream_errno;
 		return -1;
 	}
 	return 1;
@@ -230,11 +208,6 @@
 	if (unlikely(stream->closed))
 		return -1;
 
-	if (stream->real_stream->write_at == NULL) {
-		/* stream doesn't support seeking */
-		stream->stream_errno = EPIPE;
-		return -1;
-	}
 	ret = stream->real_stream->write_at(stream->real_stream,
 					    data, size, offset);
 	if (unlikely(ret < 0)) {
@@ -244,22 +217,6 @@
 	return ret;
 }
 
-static off_t o_stream_default_send_istream(struct ostream_private *outstream,
-					   struct istream *instream)
-{
-	return io_stream_copy(&outstream->ostream, instream, IO_BLOCK_SIZE);
-}
-
-struct ostream *o_stream_create(struct ostream_private *_stream)
-{
-	_stream->ostream.real_stream = _stream;
-	if (_stream->send_istream == NULL)
-		_stream->send_istream = o_stream_default_send_istream;
-
-	io_stream_init(&_stream->iostream);
-	return &_stream->ostream;
-}
-
 off_t io_stream_copy(struct ostream *outstream, struct istream *instream,
 		     size_t block_size)
 {
@@ -297,6 +254,169 @@
 {
 	struct ostream_private *_stream = stream->real_stream;
 
-	if (_stream->switch_ioloop != NULL)
-		_stream->switch_ioloop(_stream);
+	_stream->switch_ioloop(_stream);
+}
+
+static void o_stream_default_close(struct iostream_private *stream)
+{
+	struct ostream_private *_stream = (struct ostream_private *)stream;
+
+	(void)o_stream_flush(&_stream->ostream);
+}
+
+static void o_stream_default_destroy(struct iostream_private *stream)
+{
+	struct ostream_private *_stream = (struct ostream_private *)stream;
+
+	if (_stream->parent != NULL)
+		o_stream_unref(&_stream->parent);
+}
+
+static void
+o_stream_default_set_max_buffer_size(struct iostream_private *stream,
+				     size_t max_size)
+{
+	struct ostream_private *_stream = (struct ostream_private *)stream;
+
+	if (_stream->parent != NULL)
+		o_stream_set_max_buffer_size(_stream->parent, max_size);
+	_stream->max_buffer_size = max_size;
+}
+
+static void o_stream_default_cork(struct ostream_private *_stream, bool set)
+{
+	_stream->corked = set;
+	if (set) {
+		if (_stream->parent != NULL)
+			o_stream_cork(_stream->parent);
+	} else {
+		(void)o_stream_flush(&_stream->ostream);
+		if (_stream->parent != NULL)
+			o_stream_uncork(_stream->parent);
+	}
+}
+
+void o_stream_copy_error_from_parent(struct ostream_private *_stream)
+{
+	struct ostream *src = _stream->parent;
+	struct ostream *dest = &_stream->ostream;
+
+	dest->stream_errno = src->stream_errno;
+	dest->last_failed_errno = src->last_failed_errno;
+	dest->overflow = src->overflow;
+}
+
+static int o_stream_default_flush(struct ostream_private *_stream)
+{
+	int ret;
+
+	if (_stream->parent == NULL)
+		return 1;
+
+	if ((ret = o_stream_flush(_stream->parent)) < 0)
+		o_stream_copy_error_from_parent(_stream);
+	return ret;
+}
+
+static void
+o_stream_default_set_flush_callback(struct ostream_private *_stream,
+				    stream_flush_callback_t *callback,
+				    void *context)
+{
+	if (_stream->parent == NULL) {
+		_stream->callback = callback;
+		_stream->context = context;
+	} else {
+		/* this is a filter stream, we don't have a flush
+		   callback ourself */
+		o_stream_set_flush_callback(_stream->parent, callback, context);
+	}
 }
+
+static void
+o_stream_default_set_flush_pending(struct ostream_private *_stream, bool set)
+{
+	if (_stream->parent != NULL)
+		o_stream_set_flush_pending(_stream->parent, set);
+}
+
+static size_t
+o_stream_default_get_used_size(const struct ostream_private *_stream)
+{
+	if (_stream->parent == NULL)
+		return 0;
+	else
+		return o_stream_get_buffer_used_size(_stream->parent);
+}
+
+static int
+o_stream_default_seek(struct ostream_private *_stream,
+		      uoff_t offset ATTR_UNUSED)
+{
+	_stream->ostream.stream_errno = EPIPE;
+	return -1;
+}
+
+static int
+o_stream_default_write_at(struct ostream_private *_stream,
+			  const void *data ATTR_UNUSED,
+			  size_t size ATTR_UNUSED, uoff_t offset ATTR_UNUSED)
+{
+	_stream->ostream.stream_errno = EPIPE;
+	return -1;
+}
+
+static off_t o_stream_default_send_istream(struct ostream_private *outstream,
+					   struct istream *instream)
+{
+	return io_stream_copy(&outstream->ostream, instream, IO_BLOCK_SIZE);
+}
+
+static void o_stream_default_switch_ioloop(struct ostream_private *_stream)
+{
+	if (_stream->parent != NULL)
+		o_stream_switch_ioloop(_stream->parent);
+}
+
+struct ostream *
+o_stream_create(struct ostream_private *_stream, struct ostream *parent)
+{
+	_stream->ostream.real_stream = _stream;
+	if (parent != NULL) {
+		_stream->parent = parent;
+		o_stream_ref(parent);
+	}
+
+	if (_stream->iostream.close == NULL)
+		_stream->iostream.close = o_stream_default_close;
+	if (_stream->iostream.destroy == NULL)
+		_stream->iostream.destroy = o_stream_default_destroy;
+	if (_stream->iostream.set_max_buffer_size == NULL) {
+		_stream->iostream.set_max_buffer_size =
+			o_stream_default_set_max_buffer_size;
+	}
+
+	if (_stream->cork == NULL)
+		_stream->cork = o_stream_default_cork;
+	if (_stream->flush == NULL)
+		_stream->flush = o_stream_default_flush;
+	if (_stream->set_flush_callback == NULL) {
+		_stream->set_flush_callback =
+			o_stream_default_set_flush_callback;
+	}
+	if (_stream->flush_pending == NULL)
+		_stream->flush_pending = o_stream_default_set_flush_pending;
+	if (_stream->get_used_size == NULL)
+		_stream->get_used_size = o_stream_default_get_used_size;
+	if (_stream->seek == NULL)
+		_stream->seek = o_stream_default_seek;
+	if (_stream->write_at == NULL)
+		_stream->write_at = o_stream_default_write_at;
+	if (_stream->send_istream == NULL)
+		_stream->send_istream = o_stream_default_send_istream;
+	if (_stream->switch_ioloop == NULL)
+		_stream->switch_ioloop = o_stream_default_switch_ioloop;
+
+	io_stream_init(&_stream->iostream);
+	return &_stream->ostream;
+}
--- a/src/plugins/zlib/ostream-bzlib.c	Wed Sep 21 13:56:13 2011 +0300
+++ b/src/plugins/zlib/ostream-bzlib.c	Wed Sep 21 14:40:35 2011 +0300
@@ -20,25 +20,11 @@
 	unsigned int flushed:1;
 };
 
-static void zstream_copy_error(struct bzlib_ostream *zstream)
-{
-	struct ostream *src = zstream->output;
-	struct ostream *dest = &zstream->ostream.ostream;
-
-	dest->stream_errno = src->stream_errno;
-	dest->last_failed_errno = src->last_failed_errno;
-	dest->overflow = src->overflow;
-}
-
 static void o_stream_bzlib_close(struct iostream_private *stream)
 {
 	struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
 
-	if (zstream->output == NULL)
-		return;
-
 	o_stream_flush(&zstream->ostream.ostream);
-	o_stream_unref(&zstream->output);
 	(void)BZ2_bzCompressEnd(&zstream->zs);
 }
 
@@ -56,10 +42,11 @@
 			zs->next_out = zstream->outbuf;
 			zs->avail_out = sizeof(zstream->outbuf);
 
-			ret = o_stream_send(zstream->output, zstream->outbuf,
+			ret = o_stream_send(zstream->ostream.parent,
+					    zstream->outbuf,
 					    sizeof(zstream->outbuf));
 			if (ret != (ssize_t)sizeof(zstream->outbuf)) {
-				zstream_copy_error(zstream);
+				o_stream_copy_error_from_parent(&zstream->ostream);
 				return -1;
 			}
 		}
@@ -93,10 +80,10 @@
 			zs->next_out = zstream->outbuf;
 			zs->avail_out = sizeof(zstream->outbuf);
 
-			ret = o_stream_send(zstream->output,
+			ret = o_stream_send(zstream->ostream.parent,
 					    zstream->outbuf, len);
 			if (ret != (int)len) {
-				zstream_copy_error(zstream);
+				o_stream_copy_error_from_parent(&zstream->ostream);
 				return -1;
 			}
 			if (done)
@@ -119,19 +106,6 @@
 	return 0;
 }
 
-static void o_stream_bzlib_cork(struct ostream_private *stream, bool set)
-{
-	struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
-
-	stream->corked = set;
-	if (set)
-		o_stream_cork(zstream->output);
-	else {
-		(void)o_stream_flush(&stream->ostream);
-		o_stream_uncork(zstream->output);
-	}
-}
-
 static int o_stream_bzlib_flush(struct ostream_private *stream)
 {
 	struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
@@ -140,19 +114,12 @@
 	if (o_stream_bzlib_send_flush(zstream) < 0)
 		return -1;
 
-	ret = o_stream_flush(zstream->output);
+	ret = o_stream_flush(stream->parent);
 	if (ret < 0)
-		zstream_copy_error(zstream);
+		o_stream_copy_error_from_parent(stream);
 	return ret;
 }
 
-static void o_stream_bzlib_switch_ioloop(struct ostream_private *stream)
-{
-	struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
-
-	o_stream_switch_ioloop(zstream->output);
-}
-
 static ssize_t
 o_stream_bzlib_sendv(struct ostream_private *stream,
 		    const struct const_iovec *iov, unsigned int iov_count)
@@ -181,12 +148,8 @@
 
 	zstream = i_new(struct bzlib_ostream, 1);
 	zstream->ostream.sendv = o_stream_bzlib_sendv;
-	zstream->ostream.cork = o_stream_bzlib_cork;
 	zstream->ostream.flush = o_stream_bzlib_flush;
-	zstream->ostream.switch_ioloop = o_stream_bzlib_switch_ioloop;
 	zstream->ostream.iostream.close = o_stream_bzlib_close;
-	zstream->output = output;
-	o_stream_ref(output);
 
 	ret = BZ2_bzCompressInit(&zstream->zs, level, 0, 0);
 	switch (ret) {
@@ -205,6 +168,6 @@
 
 	zstream->zs.next_out = zstream->outbuf;
 	zstream->zs.avail_out = sizeof(zstream->outbuf);
-	return o_stream_create(&zstream->ostream);
+	return o_stream_create(&zstream->ostream, output);
 }
 #endif
--- a/src/plugins/zlib/ostream-zlib.c	Wed Sep 21 13:56:13 2011 +0300
+++ b/src/plugins/zlib/ostream-zlib.c	Wed Sep 21 14:40:35 2011 +0300
@@ -19,7 +19,6 @@
 	unsigned char gz_header[10];
 	unsigned char outbuf[CHUNK_SIZE];
 
-	struct ostream *output;
 	uint32_t crc, bytes32;
 
 	unsigned int gz:1;
@@ -27,25 +26,11 @@
 	unsigned int flushed:1;
 };
 
-static void zstream_copy_error(struct zlib_ostream *zstream)
-{
-	struct ostream *src = zstream->output;
-	struct ostream *dest = &zstream->ostream.ostream;
-
-	dest->stream_errno = src->stream_errno;
-	dest->last_failed_errno = src->last_failed_errno;
-	dest->overflow = src->overflow;
-}
-
 static void o_stream_zlib_close(struct iostream_private *stream)
 {
 	struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
 
-	if (zstream->output == NULL)
-		return;
-
 	o_stream_flush(&zstream->ostream.ostream);
-	o_stream_unref(&zstream->output);
 	(void)deflateEnd(&zstream->zs);
 }
 
@@ -53,10 +38,10 @@
 {
 	ssize_t ret;
 
-	ret = o_stream_send(zstream->output, zstream->gz_header,
+	ret = o_stream_send(zstream->ostream.parent, zstream->gz_header,
 			    sizeof(zstream->gz_header));
 	if ((size_t)ret != sizeof(zstream->gz_header)) {
-		zstream_copy_error(zstream);
+		o_stream_copy_error_from_parent(&zstream->ostream);
 		return -1;
 	}
 	zstream->header_sent = TRUE;
@@ -79,12 +64,14 @@
 
 static int o_stream_zlib_send_gz_trailer(struct zlib_ostream *zstream)
 {
+	struct ostream *output = zstream->ostream.parent;
+
 	if (!zstream->gz)
 		return 0;
 
-	if (o_stream_zlib_lsb_uint32(zstream->output, zstream->crc) < 0 ||
-	    o_stream_zlib_lsb_uint32(zstream->output, zstream->bytes32) < 0) {
-		zstream_copy_error(zstream);
+	if (o_stream_zlib_lsb_uint32(output, zstream->crc) < 0 ||
+	    o_stream_zlib_lsb_uint32(output, zstream->bytes32) < 0) {
+		o_stream_copy_error_from_parent(&zstream->ostream);
 		return -1;
 	}
 	return 0;
@@ -111,10 +98,11 @@
 			zs->next_out = zstream->outbuf;
 			zs->avail_out = sizeof(zstream->outbuf);
 
-			ret = o_stream_send(zstream->output, zstream->outbuf,
+			ret = o_stream_send(zstream->ostream.parent,
+					    zstream->outbuf,
 					    sizeof(zstream->outbuf));
 			if (ret != (ssize_t)sizeof(zstream->outbuf)) {
-				zstream_copy_error(zstream);
+				o_stream_copy_error_from_parent(&zstream->ostream);
 				return -1;
 			}
 		}
@@ -154,10 +142,10 @@
 			zs->next_out = zstream->outbuf;
 			zs->avail_out = sizeof(zstream->outbuf);
 
-			ret = o_stream_send(zstream->output,
+			ret = o_stream_send(zstream->ostream.parent,
 					    zstream->outbuf, len);
 			if (ret != (int)len) {
-				zstream_copy_error(zstream);
+				o_stream_copy_error_from_parent(&zstream->ostream);
 				return -1;
 			}
 			if (done)
@@ -182,19 +170,6 @@
 	return 0;
 }
 
-static void o_stream_zlib_cork(struct ostream_private *stream, bool set)
-{
-	struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
-
-	stream->corked = set;
-	if (set)
-		o_stream_cork(zstream->output);
-	else {
-		(void)o_stream_flush(&stream->ostream);
-		o_stream_uncork(zstream->output);
-	}
-}
-
 static int o_stream_zlib_flush(struct ostream_private *stream)
 {
 	struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
@@ -203,19 +178,12 @@
 	if (o_stream_zlib_send_flush(zstream) < 0)
 		return -1;
 
-	ret = o_stream_flush(zstream->output);
+	ret = o_stream_flush(stream->parent);
 	if (ret < 0)
-		zstream_copy_error(zstream);
+		o_stream_copy_error_from_parent(stream);
 	return ret;
 }
 
-static void o_stream_zlib_switch_ioloop(struct ostream_private *stream)
-{
-	struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
-
-	o_stream_switch_ioloop(zstream->output);
-}
-
 static ssize_t
 o_stream_zlib_sendv(struct ostream_private *stream,
 		    const struct const_iovec *iov, unsigned int iov_count)
@@ -265,16 +233,12 @@
 
 	zstream = i_new(struct zlib_ostream, 1);
 	zstream->ostream.sendv = o_stream_zlib_sendv;
-	zstream->ostream.cork = o_stream_zlib_cork;
 	zstream->ostream.flush = o_stream_zlib_flush;
-	zstream->ostream.switch_ioloop = o_stream_zlib_switch_ioloop;
 	zstream->ostream.iostream.close = o_stream_zlib_close;
-	zstream->output = output;
 	zstream->crc = 0;
 	zstream->gz = gz;
 	if (!gz)
 		zstream->header_sent = TRUE;
-	o_stream_ref(output);
 
 	o_stream_zlib_init_gz_header(zstream, level, strategy);
 	ret = deflateInit2(&zstream->zs, level, Z_DEFLATED, -15, 8, strategy);
@@ -293,7 +257,7 @@
 
 	zstream->zs.next_out = zstream->outbuf;
 	zstream->zs.avail_out = sizeof(zstream->outbuf);
-	return o_stream_create(&zstream->ostream);
+	return o_stream_create(&zstream->ostream, output);
 }
 
 struct ostream *o_stream_create_gz(struct ostream *output, int level)