view src/lib-compression/istream-lz4.c @ 22616:629f44740f50

cassandra: Include "prepared" when logging about prepared statement queries Mainly useful for debugging/testing.
author Timo Sirainen <timo.sirainen@dovecot.fi>
date Mon, 16 Oct 2017 15:12:12 +0300
parents bf1220873408
children cb108f786fb4
line wrap: on
line source

/* Copyright (c) 2013-2017 Dovecot authors, see the included COPYING file */

#include "lib.h"

#ifdef HAVE_LZ4

#include "buffer.h"
#include "istream-private.h"
#include "istream-zlib.h"
#include "iostream-lz4.h"
#include <lz4.h>

struct lz4_istream {
	struct istream_private istream;

	uoff_t stream_size;
	struct stat last_parent_statbuf;

	buffer_t *chunk_buf;
	uint32_t chunk_size, chunk_left, max_uncompressed_chunk_size;

	unsigned int log_errors:1;
	unsigned int marked:1;
	unsigned int header_read:1;
};

static void i_stream_lz4_close(struct iostream_private *stream,
			       bool close_parent)
{
	struct lz4_istream *zstream = (struct lz4_istream *)stream;

	if (zstream->chunk_buf != NULL)
		buffer_free(&zstream->chunk_buf);
	if (close_parent)
		i_stream_close(zstream->istream.parent);
}

static void lz4_read_error(struct lz4_istream *zstream, const char *error)
{
	io_stream_set_error(&zstream->istream.iostream,
			    "lz4.read(%s): %s at %"PRIuUOFF_T,
			    i_stream_get_name(&zstream->istream.istream), error,
			    zstream->istream.abs_start_offset +
			    zstream->istream.istream.v_offset);
	if (zstream->log_errors)
		i_error("%s", zstream->istream.iostream.error);
}

static int i_stream_lz4_read_header(struct lz4_istream *zstream)
{
	const struct iostream_lz4_header *hdr;
	const unsigned char *data;
	size_t size;
	int ret;

	ret = i_stream_read_data(zstream->istream.parent, &data, &size,
				 sizeof(*hdr)-1);
	if (ret < 0) {
		zstream->istream.istream.stream_errno =
			zstream->istream.parent->stream_errno;
		return ret;
	}
	if (ret == 0 && !zstream->istream.istream.eof)
		return 0;
	hdr = (const void *)data;
	if (ret == 0 || memcmp(hdr->magic, IOSTREAM_LZ4_MAGIC,
			       IOSTREAM_LZ4_MAGIC_LEN) != 0) {
		lz4_read_error(zstream, "wrong magic in header (not lz4 file?)");
		zstream->istream.istream.stream_errno = EINVAL;
		return -1;
	}
	zstream->max_uncompressed_chunk_size =
		be32_to_cpu_unaligned(hdr->max_uncompressed_chunk_size);
	if (zstream->max_uncompressed_chunk_size > ISTREAM_LZ4_CHUNK_SIZE) {
		lz4_read_error(zstream, t_strdup_printf(
			"lz4 max chunk size too large (%u > %u)",
			zstream->max_uncompressed_chunk_size,
			ISTREAM_LZ4_CHUNK_SIZE));
		zstream->istream.istream.stream_errno = EINVAL;
		return -1;
	}
	i_stream_skip(zstream->istream.parent, sizeof(*hdr));
	return 1;
}

static ssize_t i_stream_lz4_read(struct istream_private *stream)
{
	struct lz4_istream *zstream = (struct lz4_istream *)stream;
	const unsigned char *data;
	size_t size, max_size;
	int ret;

	if (!zstream->header_read) {
		if ((ret = i_stream_lz4_read_header(zstream)) <= 0)
			return ret;
		zstream->header_read = TRUE;
	}

	if (zstream->chunk_left == 0) {
		ret = i_stream_read_data(stream->parent, &data, &size,
					 IOSTREAM_LZ4_CHUNK_PREFIX_LEN);
		if (ret < 0) {
			stream->istream.stream_errno =
				stream->parent->stream_errno;
			if (stream->istream.stream_errno == 0) {
				stream->istream.eof = TRUE;
				zstream->stream_size = stream->istream.v_offset +
					stream->pos - stream->skip;
			}
			return ret;
		}
		if (ret == 0 && !stream->istream.eof)
			return 0;
		zstream->chunk_size = zstream->chunk_left =
			be32_to_cpu_unaligned(data);
		if (zstream->chunk_size == 0 ||
		    zstream->chunk_size > ISTREAM_LZ4_CHUNK_SIZE) {
			lz4_read_error(zstream, t_strdup_printf(
				"invalid lz4 chunk size: %u", zstream->chunk_size));
			stream->istream.stream_errno = EINVAL;
			return -1;
		}
		i_stream_skip(stream->parent, IOSTREAM_LZ4_CHUNK_PREFIX_LEN);
		buffer_set_used_size(zstream->chunk_buf, 0);
	}

	/* read the whole compressed chunk into memory */
	while (zstream->chunk_left > 0 &&
	       (ret = i_stream_read_data(zstream->istream.parent,
					 &data, &size, 0)) > 0) {
		if (size > zstream->chunk_left)
			size = zstream->chunk_left;
		buffer_append(zstream->chunk_buf, data, size);
		i_stream_skip(zstream->istream.parent, size);
		zstream->chunk_left -= size;
	}
	if (zstream->chunk_left > 0) {
		if (ret == -1 && zstream->istream.parent->stream_errno == 0) {
			lz4_read_error(zstream, "truncated lz4 chunk");
			stream->istream.stream_errno = EINVAL;
			return -1;
		}
		zstream->istream.istream.stream_errno =
			zstream->istream.parent->stream_errno;
		return ret;
	}
	/* if we already have max_buffer_size amount of data, fail here */
	i_stream_compress(stream);
	if (stream->pos >= i_stream_get_max_buffer_size(&stream->istream))
		return -2;
	/* allocate enough space for the old data and the new
	   decompressed chunk. we don't know the original compressed size,
	   so just allocate the max amount of memory. */
	max_size = stream->pos + zstream->max_uncompressed_chunk_size;
	if (stream->buffer_size < max_size) {
		stream->w_buffer = i_realloc(stream->w_buffer,
					     stream->buffer_size, max_size);
		stream->buffer_size = max_size;
		stream->buffer = stream->w_buffer;
	}
	ret = LZ4_decompress_safe(zstream->chunk_buf->data,
				  (void *)(stream->w_buffer + stream->pos),
				  zstream->chunk_buf->used,
				  stream->buffer_size - stream->pos);
	i_assert(ret <= (int)zstream->max_uncompressed_chunk_size);
	if (ret < 0) {
		lz4_read_error(zstream, "corrupted lz4 chunk");
		stream->istream.stream_errno = EINVAL;
		return -1;
	}
	i_assert(ret > 0);
	stream->pos += ret;
	i_assert(stream->pos <= stream->buffer_size);
	return ret;
}

static void i_stream_lz4_reset(struct lz4_istream *zstream)
{
	struct istream_private *stream = &zstream->istream;

	i_stream_seek(stream->parent, stream->parent_start_offset);
	zstream->header_read = FALSE;
	zstream->chunk_size = zstream->chunk_left = 0;

	stream->parent_expected_offset = stream->parent_start_offset;
	stream->skip = stream->pos = 0;
	stream->istream.v_offset = 0;
}

static void
i_stream_lz4_seek(struct istream_private *stream, uoff_t v_offset, bool mark)
{
	struct lz4_istream *zstream = (struct lz4_istream *) stream;
	uoff_t start_offset = stream->istream.v_offset - stream->skip;

	if (v_offset < start_offset) {
		/* have to seek backwards */
		i_stream_lz4_reset(zstream);
		start_offset = 0;
	}

	if (v_offset <= start_offset + stream->pos) {
		/* seeking backwards within what's already cached */
		stream->skip = v_offset - start_offset;
		stream->istream.v_offset = v_offset;
		stream->pos = stream->skip;
	} else {
		/* read and cache forward */
		ssize_t ret;

		do {
			size_t avail = stream->pos - stream->skip;

			if (stream->istream.v_offset + avail >= v_offset) {
				i_stream_skip(&stream->istream,
					      v_offset -
					      stream->istream.v_offset);
				ret = -1;
				break;
			}

			i_stream_skip(&stream->istream, avail);
		} while ((ret = i_stream_read(&stream->istream)) > 0);
		i_assert(ret == -1);

		if (stream->istream.v_offset != v_offset) {
			/* some failure, we've broken it */
			if (stream->istream.stream_errno != 0) {
				i_error("lz4_istream.seek(%s) failed: %s",
					i_stream_get_name(&stream->istream),
					strerror(stream->istream.stream_errno));
				i_stream_close(&stream->istream);
			} else {
				/* unexpected EOF. allow it since we may just
				   want to check if there's anything.. */
				i_assert(stream->istream.eof);
			}
		}
	}

	if (mark)
		zstream->marked = TRUE;
}

static int
i_stream_lz4_stat(struct istream_private *stream, bool exact)
{
	struct lz4_istream *zstream = (struct lz4_istream *) stream;
	const struct stat *st;
	size_t size;

	if (i_stream_stat(stream->parent, exact, &st) < 0) {
		stream->istream.stream_errno = stream->parent->stream_errno;
		return -1;
	}
	stream->statbuf = *st;

	/* when exact=FALSE always return the parent stat's size, even if we
	   know the exact value. this is necessary because otherwise e.g. mbox
	   code can see two different values and think that a compressed mbox
	   file keeps changing. */
	if (!exact)
		return 0;

	if (zstream->stream_size == (uoff_t)-1) {
		uoff_t old_offset = stream->istream.v_offset;
		ssize_t ret;

		do {
			size = i_stream_get_data_size(&stream->istream);
			i_stream_skip(&stream->istream, size);
		} while ((ret = i_stream_read(&stream->istream)) > 0);
		i_assert(ret == -1);

		i_stream_seek(&stream->istream, old_offset);
		if (zstream->stream_size == (uoff_t)-1)
			return -1;
	}
	stream->statbuf.st_size = zstream->stream_size;
	return 0;
}

static void i_stream_lz4_sync(struct istream_private *stream)
{
	struct lz4_istream *zstream = (struct lz4_istream *) stream;
	const struct stat *st;

	if (i_stream_stat(stream->parent, FALSE, &st) < 0) {
		if (memcmp(&zstream->last_parent_statbuf,
			   st, sizeof(*st)) == 0) {
			/* a compressed file doesn't change unexpectedly,
			   don't clear our caches unnecessarily */
			return;
		}
		zstream->last_parent_statbuf = *st;
	}
	i_stream_lz4_reset(zstream);
}

struct istream *i_stream_create_lz4(struct istream *input, bool log_errors)
{
	struct lz4_istream *zstream;

	zstream = i_new(struct lz4_istream, 1);
	zstream->stream_size = (uoff_t)-1;
	zstream->log_errors = log_errors;

	zstream->istream.iostream.close = i_stream_lz4_close;
	zstream->istream.max_buffer_size = input->real_stream->max_buffer_size;
	zstream->istream.read = i_stream_lz4_read;
	zstream->istream.seek = i_stream_lz4_seek;
	zstream->istream.stat = i_stream_lz4_stat;
	zstream->istream.sync = i_stream_lz4_sync;

	zstream->istream.istream.readable_fd = FALSE;
	zstream->istream.istream.blocking = input->blocking;
	zstream->istream.istream.seekable = input->seekable;
	zstream->chunk_buf = buffer_create_dynamic(default_pool, 1024);

	return i_stream_create(&zstream->istream, input,
			       i_stream_get_fd(input));
}
#endif