view src/lib/ostream.c @ 14681:ca37d1577291

Added o_stream_nsend*() and related functions to make delayed error handling safer. Once o_stream_nsend*() is called, o_stream_nfinish() must be called before stream is destroyed to finish checking if there were any errors. If something failed and the stream is just wanted to be closed, o_stream_ignore_last_errors() can be called. For streams where errors don't really make any difference (network sockets) you can call o_stream_set_no_error_handling() immediately after creating the stream.
author Timo Sirainen <tss@iki.fi>
date Mon, 25 Jun 2012 00:01:59 +0300
parents c93ca5e46a8a
children 096e4c4d62bb
line wrap: on
line source

/* Copyright (c) 2002-2012 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "istream.h"
#include "ostream-private.h"

void o_stream_set_name(struct ostream *stream, const char *name)
{
	i_free(stream->real_stream->iostream.name);
	stream->real_stream->iostream.name = i_strdup(name);
}

const char *o_stream_get_name(struct ostream *stream)
{
	while (stream->real_stream->iostream.name == NULL) {
		stream = stream->real_stream->parent;
		if (stream == NULL)
			return "";
	}
	return stream->real_stream->iostream.name;
}

void o_stream_destroy(struct ostream **stream)
{
	o_stream_close(*stream);
	o_stream_unref(stream);
}

void o_stream_ref(struct ostream *stream)
{
	io_stream_ref(&stream->real_stream->iostream);
}

void o_stream_unref(struct ostream **_stream)
{
	struct ostream *stream = *_stream;

	if (!stream->real_stream->last_errors_not_checked &&
	    !stream->real_stream->error_handling_disabled &&
	    stream->real_stream->iostream.refcount == 1) {
		i_panic("output stream %s is missing error handling",
			o_stream_get_name(stream));
	}

	io_stream_unref(&stream->real_stream->iostream);
	*_stream = NULL;
}

void o_stream_close(struct ostream *stream)
{
	io_stream_close(&stream->real_stream->iostream);
	stream->closed = TRUE;
}

#undef o_stream_set_flush_callback
void o_stream_set_flush_callback(struct ostream *stream,
				 stream_flush_callback_t *callback,
				 void *context)
{
	struct ostream_private *_stream = stream->real_stream;

	_stream->set_flush_callback(_stream, callback, context);
}

void o_stream_unset_flush_callback(struct ostream *stream)
{
	struct ostream_private *_stream = stream->real_stream;

	_stream->set_flush_callback(_stream, NULL, NULL);
}

void o_stream_set_max_buffer_size(struct ostream *stream, size_t max_size)
{
	io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size);
}

void o_stream_cork(struct ostream *stream)
{
	struct ostream_private *_stream = stream->real_stream;

	if (unlikely(stream->closed))
		return;

	_stream->cork(_stream, TRUE);
}

void o_stream_uncork(struct ostream *stream)
{
	struct ostream_private *_stream = stream->real_stream;

	if (unlikely(stream->closed))
		return;

	_stream->cork(_stream, FALSE);
}

int o_stream_flush(struct ostream *stream)
{
	struct ostream_private *_stream = stream->real_stream;
	int ret = 1;

	if (unlikely(stream->closed)) {
		errno = stream->stream_errno;
		return -1;
	}

	stream->stream_errno = 0;
	if (unlikely((ret = _stream->flush(_stream)) < 0)) {
		i_assert(stream->stream_errno != 0);
		stream->last_failed_errno = stream->stream_errno;
		errno = stream->stream_errno;
	}
	return ret;
}

void o_stream_set_flush_pending(struct ostream *stream, bool set)
{
	struct ostream_private *_stream = stream->real_stream;

	if (unlikely(stream->closed))
		return;

	_stream->flush_pending(_stream, set);
}

size_t o_stream_get_buffer_used_size(const struct ostream *stream)
{
	const struct ostream_private *_stream = stream->real_stream;

	return _stream->get_used_size(_stream);
}

size_t o_stream_get_buffer_avail_size(const struct ostream *stream)
{
	size_t used = o_stream_get_buffer_used_size(stream);

	return stream->real_stream->max_buffer_size <= used ? 0 :
		stream->real_stream->max_buffer_size - used;
}

int o_stream_seek(struct ostream *stream, uoff_t offset)
{
	struct ostream_private *_stream = stream->real_stream;

	if (unlikely(stream->closed)) {
		errno = stream->stream_errno;
		return -1;
	}

	stream->stream_errno = 0;
	if (unlikely(_stream->seek(_stream, offset) < 0)) {
		i_assert(stream->stream_errno != 0);
		stream->last_failed_errno = stream->stream_errno;
		errno = stream->stream_errno;
		return -1;
	}
	return 1;
}

ssize_t o_stream_send(struct ostream *stream, const void *data, size_t size)
{
	struct const_iovec iov;

	memset(&iov, 0, sizeof(iov));
	iov.iov_base = data;
	iov.iov_len = size;

 	return o_stream_sendv(stream, &iov, 1);
}

ssize_t o_stream_sendv(struct ostream *stream, const struct const_iovec *iov,
		       unsigned int iov_count)
{
	struct ostream_private *_stream = stream->real_stream;
	unsigned int i;
	size_t total_size;
	ssize_t ret;

	if (unlikely(stream->closed)) {
		errno = stream->stream_errno;
		return -1;
	}

	stream->stream_errno = 0;
	for (i = 0, total_size = 0; i < iov_count; i++)
		total_size += iov[i].iov_len;
	if (total_size == 0)
		return 0;

	ret = _stream->sendv(_stream, iov, iov_count);
	if (unlikely(ret != (ssize_t)total_size)) {
		if (ret < 0) {
			i_assert(stream->stream_errno != 0);
			stream->last_failed_errno = stream->stream_errno;
			errno = stream->stream_errno;
		} else {
			stream->overflow = TRUE;
		}
	}
	return ret;
}

ssize_t o_stream_send_str(struct ostream *stream, const char *str)
{
	return o_stream_send(stream, str, strlen(str));
}

void o_stream_nsend(struct ostream *stream, const void *data, size_t size)
{
	struct const_iovec iov;

	memset(&iov, 0, sizeof(iov));
	iov.iov_base = data;
	iov.iov_len = size;

	(void)o_stream_nsendv(stream, &iov, 1);
}

void o_stream_nsendv(struct ostream *stream, const struct const_iovec *iov,
		     unsigned int iov_count)
{
	if (unlikely(stream->closed))
		return;
	(void)o_stream_sendv(stream, iov, iov_count);
	stream->real_stream->last_errors_not_checked = TRUE;
}

void o_stream_nsend_str(struct ostream *stream, const char *str)
{
	o_stream_nsend(stream, str, strlen(str));
}

void o_stream_nflush(struct ostream *stream)
{
	if (unlikely(stream->closed))
		return;
	(void)o_stream_flush(stream);
	stream->real_stream->last_errors_not_checked = TRUE;
}

int o_stream_nfinish(struct ostream *stream)
{
	o_stream_nflush(stream);
	stream->real_stream->last_errors_not_checked = FALSE;
	errno = stream->last_failed_errno;
	return stream->last_failed_errno != 0 ? -1 : 0;
}

void o_stream_ignore_last_errors(struct ostream *stream)
{
	stream->real_stream->last_errors_not_checked = FALSE;
}

void o_stream_set_no_error_handling(struct ostream *stream, bool set)
{
	stream->real_stream->error_handling_disabled = set;
}

off_t o_stream_send_istream(struct ostream *outstream,
			    struct istream *instream)
{
	struct ostream_private *_outstream = outstream->real_stream;
	off_t ret;

	if (unlikely(outstream->closed || instream->closed)) {
		errno = outstream->stream_errno;
		return -1;
	}

	outstream->stream_errno = 0;
	ret = _outstream->send_istream(_outstream, instream);
	if (unlikely(ret < 0)) {
		i_assert(outstream->stream_errno != 0);
		outstream->last_failed_errno = outstream->stream_errno;
		errno = outstream->stream_errno;
	}
	return ret;
}

int o_stream_pwrite(struct ostream *stream, const void *data, size_t size,
		    uoff_t offset)
{
	int ret;

	if (unlikely(stream->closed)) {
		errno = stream->stream_errno;
		return -1;
	}

	ret = stream->real_stream->write_at(stream->real_stream,
					    data, size, offset);
	if (unlikely(ret < 0)) {
		i_assert(stream->stream_errno != 0);
		stream->last_failed_errno = stream->stream_errno;
		errno = stream->stream_errno;
	}
	return ret;
}

off_t io_stream_copy(struct ostream *outstream, struct istream *instream,
		     size_t block_size)
{
	uoff_t start_offset;
	struct const_iovec iov;
	const unsigned char *data;
	ssize_t ret;

	start_offset = instream->v_offset;
	for (;;) {
		(void)i_stream_read_data(instream, &data, &iov.iov_len,
					 block_size-1);
		if (iov.iov_len == 0) {
			/* all sent */
			break;
		}

		iov.iov_base = data;
		ret = o_stream_sendv(outstream, &iov, 1);
		if (ret <= 0) {
			if (ret == 0)
				break;
			return -1;
		}
		i_stream_skip(instream, ret);

		if ((size_t)ret != iov.iov_len)
			break;
	}

	return (off_t)(instream->v_offset - start_offset);
}

void o_stream_switch_ioloop(struct ostream *stream)
{
	struct ostream_private *_stream = stream->real_stream;

	_stream->switch_ioloop(_stream);
}

static void o_stream_default_close(struct iostream_private *stream)
{
	struct ostream_private *_stream = (struct ostream_private *)stream;

	(void)o_stream_flush(&_stream->ostream);
}

static void o_stream_default_destroy(struct iostream_private *stream)
{
	struct ostream_private *_stream = (struct ostream_private *)stream;

	if (_stream->parent != NULL)
		o_stream_unref(&_stream->parent);
}

static void
o_stream_default_set_max_buffer_size(struct iostream_private *stream,
				     size_t max_size)
{
	struct ostream_private *_stream = (struct ostream_private *)stream;

	if (_stream->parent != NULL)
		o_stream_set_max_buffer_size(_stream->parent, max_size);
	_stream->max_buffer_size = max_size;
}

static void o_stream_default_cork(struct ostream_private *_stream, bool set)
{
	_stream->corked = set;
	if (set) {
		if (_stream->parent != NULL)
			o_stream_cork(_stream->parent);
	} else {
		(void)o_stream_flush(&_stream->ostream);
		if (_stream->parent != NULL)
			o_stream_uncork(_stream->parent);
	}
}

void o_stream_copy_error_from_parent(struct ostream_private *_stream)
{
	struct ostream *src = _stream->parent;
	struct ostream *dest = &_stream->ostream;

	dest->stream_errno = src->stream_errno;
	dest->last_failed_errno = src->last_failed_errno;
	dest->overflow = src->overflow;
}

static int o_stream_default_flush(struct ostream_private *_stream)
{
	int ret;

	if (_stream->parent == NULL)
		return 1;

	if ((ret = o_stream_flush(_stream->parent)) < 0)
		o_stream_copy_error_from_parent(_stream);
	return ret;
}

static void
o_stream_default_set_flush_callback(struct ostream_private *_stream,
				    stream_flush_callback_t *callback,
				    void *context)
{
	if (_stream->parent != NULL)
		o_stream_set_flush_callback(_stream->parent, callback, context);

	_stream->callback = callback;
	_stream->context = context;
}

static void
o_stream_default_set_flush_pending(struct ostream_private *_stream, bool set)
{
	if (_stream->parent != NULL)
		o_stream_set_flush_pending(_stream->parent, set);
}

static size_t
o_stream_default_get_used_size(const struct ostream_private *_stream)
{
	if (_stream->parent == NULL)
		return 0;
	else
		return o_stream_get_buffer_used_size(_stream->parent);
}

static int
o_stream_default_seek(struct ostream_private *_stream,
		      uoff_t offset ATTR_UNUSED)
{
	_stream->ostream.stream_errno = EPIPE;
	return -1;
}

static int
o_stream_default_write_at(struct ostream_private *_stream,
			  const void *data ATTR_UNUSED,
			  size_t size ATTR_UNUSED, uoff_t offset ATTR_UNUSED)
{
	_stream->ostream.stream_errno = EPIPE;
	return -1;
}

static off_t o_stream_default_send_istream(struct ostream_private *outstream,
					   struct istream *instream)
{
	return io_stream_copy(&outstream->ostream, instream, IO_BLOCK_SIZE);
}

static void o_stream_default_switch_ioloop(struct ostream_private *_stream)
{
	if (_stream->parent != NULL)
		o_stream_switch_ioloop(_stream->parent);
}

struct ostream *
o_stream_create(struct ostream_private *_stream, struct ostream *parent)
{
	_stream->ostream.real_stream = _stream;
	if (parent != NULL) {
		_stream->parent = parent;
		o_stream_ref(parent);

		_stream->callback = parent->real_stream->callback;
		_stream->context = parent->real_stream->context;
		_stream->max_buffer_size = parent->real_stream->max_buffer_size;
		_stream->error_handling_disabled =
			parent->real_stream->error_handling_disabled;
	}

	if (_stream->iostream.close == NULL)
		_stream->iostream.close = o_stream_default_close;
	if (_stream->iostream.destroy == NULL)
		_stream->iostream.destroy = o_stream_default_destroy;
	if (_stream->iostream.set_max_buffer_size == NULL) {
		_stream->iostream.set_max_buffer_size =
			o_stream_default_set_max_buffer_size;
	}

	if (_stream->cork == NULL)
		_stream->cork = o_stream_default_cork;
	if (_stream->flush == NULL)
		_stream->flush = o_stream_default_flush;
	if (_stream->set_flush_callback == NULL) {
		_stream->set_flush_callback =
			o_stream_default_set_flush_callback;
	}
	if (_stream->flush_pending == NULL)
		_stream->flush_pending = o_stream_default_set_flush_pending;
	if (_stream->get_used_size == NULL)
		_stream->get_used_size = o_stream_default_get_used_size;
	if (_stream->seek == NULL)
		_stream->seek = o_stream_default_seek;
	if (_stream->write_at == NULL)
		_stream->write_at = o_stream_default_write_at;
	if (_stream->send_istream == NULL)
		_stream->send_istream = o_stream_default_send_istream;
	if (_stream->switch_ioloop == NULL)
		_stream->switch_ioloop = o_stream_default_switch_ioloop;

	io_stream_init(&_stream->iostream);
	return &_stream->ostream;
}