view src/lib/istream-callback.c @ 23007:36e01285b5b8

lib: buffer - Improve header comment for buffer_insert() and buffer_delete().
author Stephan Bosch <stephan.bosch@dovecot.fi>
date Mon, 18 Mar 2019 00:52:37 +0100
parents cb108f786fb4
children
line wrap: on
line source

/* Copyright (c) 2014-2018 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "buffer.h"
#include "istream-private.h"
#include "istream-callback.h"

struct callback_istream {
	struct istream_private istream;
	istream_callback_read_t *callback;
	void *context;

	buffer_t *buf;
	size_t prev_pos;
};

static void i_stream_callback_destroy(struct iostream_private *stream)
{
	struct callback_istream *cstream = (struct callback_istream *)stream;

	buffer_free(&cstream->buf);
}

static ssize_t i_stream_callback_read(struct istream_private *stream)
{
	struct callback_istream *cstream = (struct callback_istream *)stream;
	size_t pos;

	if (cstream->callback == NULL) {
		/* already returned EOF / error */
		stream->istream.eof = TRUE;
		return -1;
	}

	if (stream->skip > 0) {
		buffer_delete(cstream->buf, 0, stream->skip);
		stream->pos -= stream->skip;
		cstream->prev_pos -= stream->skip;
		stream->skip = 0;
	}
	i_assert(cstream->buf->used >= cstream->prev_pos);
	pos = cstream->prev_pos;
	if (cstream->buf->used > pos) {
		/* data was added outside the callback */
	} else if (!cstream->callback(cstream->buf, cstream->context)) {
		/* EOF / error */
		stream->istream.eof = TRUE;
		cstream->callback = NULL;
		if (cstream->buf->used == pos ||
		    stream->istream.stream_errno != 0)
			return -1;
		/* EOF was returned with some data still added to the buffer.
		   return the buffer first and EOF only on the next call. */
	} else if (cstream->buf->used == pos) {
		/* buffer full */
		i_assert(cstream->buf->used > 0);
		return -2;
	}
	i_assert(cstream->buf->used > pos);
	stream->buffer = cstream->buf->data;
	cstream->prev_pos = stream->pos = cstream->buf->used;
	return cstream->buf->used - pos;
}

#undef i_stream_create_callback
struct istream *
i_stream_create_callback(istream_callback_read_t *callback, void *context)
{
	struct callback_istream *cstream;
	struct istream *istream;

	i_assert(callback != NULL);

	cstream = i_new(struct callback_istream, 1);
	cstream->callback = callback;
	cstream->context = context;
	cstream->buf = buffer_create_dynamic(default_pool, 1024);

	cstream->istream.iostream.destroy = i_stream_callback_destroy;
	cstream->istream.read = i_stream_callback_read;

	istream = i_stream_create(&cstream->istream, NULL, -1);
	istream->blocking = TRUE;
	return istream;
}

void i_stream_callback_append(struct istream *input,
			      const void *data, size_t size)
{
	struct callback_istream *cstream =
		(struct callback_istream *)input->real_stream;

	buffer_append(cstream->buf, data, size);
}

void i_stream_callback_append_str(struct istream *input, const char *str)
{
	i_stream_callback_append(input, str, strlen(str));
}

buffer_t *i_stream_callback_get_buffer(struct istream *input)
{
	struct callback_istream *cstream =
		(struct callback_istream *)input->real_stream;

	return cstream->buf;
}

void i_stream_callback_set_error(struct istream *input, int stream_errno,
				 const char *error)
{
	input->stream_errno = stream_errno;
	io_stream_set_error(&input->real_stream->iostream, "%s", error);
}