changeset 10642:9f0e01905171 HEAD

zlib plugin: Added support for compressing Maildir mails while saving.
author Timo Sirainen <tss@iki.fi>
date Sat, 06 Feb 2010 01:07:44 +0200
parents 174275bcb1a5
children e0b6f739510d
files src/plugins/zlib/Makefile.am src/plugins/zlib/ostream-bzlib.c src/plugins/zlib/ostream-zlib.c src/plugins/zlib/ostream-zlib.h src/plugins/zlib/zlib-plugin.c
diffstat 5 files changed, 568 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- a/src/plugins/zlib/Makefile.am	Sat Feb 06 01:06:47 2010 +0200
+++ b/src/plugins/zlib/Makefile.am	Sat Feb 06 01:07:44 2010 +0200
@@ -23,8 +23,11 @@
 lib20_zlib_plugin_la_SOURCES = \
 	istream-bzlib.c \
 	istream-zlib.c \
+	ostream-zlib.c \
+	ostream-bzlib.c \
 	zlib-plugin.c
 
 noinst_HEADERS = \
 	istream-zlib.h \
+	ostream-zlib.h \
 	zlib-plugin.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/plugins/zlib/ostream-bzlib.c	Sat Feb 06 01:07:44 2010 +0200
@@ -0,0 +1,188 @@
+/* Copyright (c) 2010 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+
+#ifdef HAVE_BZLIB
+
+#include "ostream-internal.h"
+#include "ostream-zlib.h"
+#include <bzlib.h>
+
+#define CHUNK_SIZE (1024*64)
+
+struct bzlib_ostream {
+	struct ostream_private ostream;
+	bz_stream zs;
+
+	char outbuf[CHUNK_SIZE];
+	struct ostream *output;
+
+	unsigned int flushed:1;
+};
+
+static void zstream_copy_error(struct bzlib_ostream *zstream)
+{
+	struct ostream *src = zstream->output;
+	struct ostream *dest = &zstream->ostream.ostream;
+
+	dest->stream_errno = src->stream_errno;
+	dest->last_failed_errno = src->last_failed_errno;
+	dest->overflow = src->overflow;
+}
+
+static void o_stream_bzlib_close(struct iostream_private *stream)
+{
+	struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
+
+	if (zstream->output == NULL)
+		return;
+
+	o_stream_flush(&zstream->ostream.ostream);
+	o_stream_unref(&zstream->output);
+	(void)BZ2_bzCompressEnd(&zstream->zs);
+}
+
+static ssize_t
+o_stream_bzlib_send_chunk(struct bzlib_ostream *zstream,
+			  const void *data, size_t size)
+{
+	bz_stream *zs = &zstream->zs;
+	ssize_t ret;
+
+	zs->next_in = (void *)data;
+	zs->avail_in = size;
+	while (zs->avail_in > 0) {
+		if (zs->avail_out == 0) {
+			zs->next_out = zstream->outbuf;
+			zs->avail_out = sizeof(zstream->outbuf);
+
+			ret = o_stream_send(zstream->output, zstream->outbuf,
+					    sizeof(zstream->outbuf));
+			if (ret != (ssize_t)sizeof(zstream->outbuf)) {
+				zstream_copy_error(zstream);
+				return -1;
+			}
+		}
+
+		switch (BZ2_bzCompress(zs, BZ_RUN)) {
+		case BZ_RUN_OK:
+			break;
+		default:
+			i_unreached();
+		}
+	}
+	zstream->flushed = FALSE;
+	return 0;
+}
+
+static int o_stream_bzlib_send_flush(struct bzlib_ostream *zstream)
+{
+	bz_stream *zs = &zstream->zs;
+	unsigned int len;
+	bool done = FALSE;
+	int ret;
+
+	i_assert(zs->avail_in == 0);
+
+	if (zstream->flushed)
+		return 0;
+
+	do {
+		len = sizeof(zstream->outbuf) - zs->avail_out;
+		if (len != 0) {
+			zs->next_out = zstream->outbuf;
+			zs->avail_out = sizeof(zstream->outbuf);
+
+			ret = o_stream_send(zstream->output,
+					    zstream->outbuf, len);
+			if (ret != (int)len) {
+				zstream_copy_error(zstream);
+				return -1;
+			}
+			if (done)
+				break;
+		}
+
+		ret = BZ2_bzCompress(zs, BZ_FINISH);
+		switch (ret) {
+		case BZ_STREAM_END:
+			done = TRUE;
+			break;
+		case BZ_FINISH_OK:
+			break;
+		default:
+			i_unreached();
+		}
+	} while (zs->avail_out != sizeof(zstream->outbuf));
+
+	zstream->flushed = TRUE;
+	return 0;
+}
+
+static int o_stream_bzlib_flush(struct ostream_private *stream)
+{
+	struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
+
+	if (o_stream_bzlib_send_flush(zstream) < 0)
+		return -1;
+
+	if (o_stream_flush(zstream->output) < 0) {
+		zstream_copy_error(zstream);
+		return -1;
+	}
+	return 0;
+}
+
+static ssize_t
+o_stream_bzlib_sendv(struct ostream_private *stream,
+		    const struct const_iovec *iov, unsigned int iov_count)
+{
+	struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
+	ssize_t bytes = 0;
+	unsigned int i;
+
+	for (i = 0; i < iov_count; i++) {
+		if (o_stream_bzlib_send_chunk(zstream, iov[i].iov_base,
+					      iov[i].iov_len) < 0)
+			return -1;
+		bytes += iov[i].iov_len;
+	}
+
+	stream->ostream.offset += bytes;
+	return bytes;
+}
+
+struct ostream *o_stream_create_bz2(struct ostream *output, int level)
+{
+	struct bzlib_ostream *zstream;
+	int ret;
+
+	i_assert(level >= 1 && level <= 9);
+
+	zstream = i_new(struct bzlib_ostream, 1);
+	zstream->ostream.sendv = o_stream_bzlib_sendv;
+	zstream->ostream.flush = o_stream_bzlib_flush;
+	zstream->ostream.iostream.close = o_stream_bzlib_close;
+	zstream->output = output;
+	o_stream_ref(output);
+
+	ret = BZ2_bzCompressInit(&zstream->zs, level, 0, 0);
+	switch (ret) {
+	case BZ_OK:
+		break;
+	case BZ_MEM_ERROR:
+		i_fatal_status(FATAL_OUTOFMEM,
+			       "bzlib: Out of memory");
+	case BZ_CONFIG_ERROR:
+		i_fatal("Wrong bzlib library version (broken compilation)");
+	case BZ_PARAM_ERROR:
+		i_fatal("bzlib: Invalid parameters");
+	default:
+		i_fatal("BZ2_bzCompressInit() failed with %d", ret);
+	}
+
+	zstream->zs.next_out = zstream->outbuf;
+	zstream->zs.avail_out = sizeof(zstream->outbuf);
+	return o_stream_create(&zstream->ostream);
+}
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/plugins/zlib/ostream-zlib.c	Sat Feb 06 01:07:44 2010 +0200
@@ -0,0 +1,278 @@
+/* Copyright (c) 2010 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+
+#ifdef HAVE_ZLIB
+
+#include "crc32.h"
+#include "ostream-internal.h"
+#include "ostream-zlib.h"
+#include <zlib.h>
+
+#define CHUNK_SIZE (1024*32)
+#define ZLIB_OS_CODE 0x03  /* Unix */
+
+struct zlib_ostream {
+	struct ostream_private ostream;
+	z_stream zs;
+
+	unsigned char gz_header[10];
+	unsigned char outbuf[CHUNK_SIZE];
+
+	struct ostream *output;
+	uint32_t crc, bytes32;
+
+	unsigned int gz:1;
+	unsigned int header_sent:1;
+	unsigned int flushed:1;
+};
+
+static void zstream_copy_error(struct zlib_ostream *zstream)
+{
+	struct ostream *src = zstream->output;
+	struct ostream *dest = &zstream->ostream.ostream;
+
+	dest->stream_errno = src->stream_errno;
+	dest->last_failed_errno = src->last_failed_errno;
+	dest->overflow = src->overflow;
+}
+
+static void o_stream_zlib_close(struct iostream_private *stream)
+{
+	struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
+
+	if (zstream->output == NULL)
+		return;
+
+	o_stream_flush(&zstream->ostream.ostream);
+	o_stream_unref(&zstream->output);
+	(void)deflateEnd(&zstream->zs);
+}
+
+static int o_stream_zlib_send_gz_header(struct zlib_ostream *zstream)
+{
+	ssize_t ret;
+
+	ret = o_stream_send(zstream->output, zstream->gz_header,
+			    sizeof(zstream->gz_header));
+	if ((size_t)ret != sizeof(zstream->gz_header)) {
+		zstream_copy_error(zstream);
+		return -1;
+	}
+	zstream->header_sent = TRUE;
+	return 0;
+}
+
+static int o_stream_zlib_lsb_uint32(struct ostream *output, uint32_t num)
+{
+	unsigned char buf[sizeof(uint32_t)];
+	unsigned int i;
+
+	for (i = 0; i < sizeof(buf); i++) {
+		buf[i] = num & 0xff;
+		num >>= 8;
+	}
+	if (o_stream_send(output, buf, sizeof(buf)) != sizeof(buf))
+		return -1;
+	return 0;
+}
+
+static int o_stream_zlib_send_gz_trailer(struct zlib_ostream *zstream)
+{
+	if (!zstream->gz)
+		return 0;
+
+	if (o_stream_zlib_lsb_uint32(zstream->output, zstream->crc) < 0 ||
+	    o_stream_zlib_lsb_uint32(zstream->output, zstream->bytes32) < 0) {
+		zstream_copy_error(zstream);
+		return -1;
+	}
+	return 0;
+}
+
+static int
+o_stream_zlib_send_chunk(struct zlib_ostream *zstream,
+			 const void *data, size_t size)
+{
+	z_stream *zs = &zstream->zs;
+	ssize_t ret;
+
+	if (!zstream->header_sent)
+		o_stream_zlib_send_gz_header(zstream);
+
+	zs->next_in = (void *)data;
+	zs->avail_in = size;
+	while (zs->avail_in > 0) {
+		if (zs->avail_out == 0) {
+			zs->next_out = zstream->outbuf;
+			zs->avail_out = sizeof(zstream->outbuf);
+
+			ret = o_stream_send(zstream->output, zstream->outbuf,
+					    sizeof(zstream->outbuf));
+			if (ret != (ssize_t)sizeof(zstream->outbuf)) {
+				zstream_copy_error(zstream);
+				return -1;
+			}
+		}
+
+		switch (deflate(zs, Z_NO_FLUSH)) {
+		case Z_OK:
+		case Z_BUF_ERROR:
+			break;
+		default:
+			i_unreached();
+		}
+	}
+	zstream->crc = crc32_data_more(zstream->crc, data, size);
+	zstream->bytes32 += size;
+	zstream->flushed = FALSE;
+	return 0;
+}
+
+static int o_stream_zlib_send_flush(struct zlib_ostream *zstream)
+{
+	z_stream *zs = &zstream->zs;
+	unsigned int len;
+	bool done = FALSE;
+	int ret;
+
+	i_assert(zs->avail_in == 0);
+
+	if (zstream->flushed)
+		return 0;
+	if (!zstream->header_sent)
+		o_stream_zlib_send_gz_header(zstream);
+
+	do {
+		len = sizeof(zstream->outbuf) - zs->avail_out;
+		if (len != 0) {
+			zs->next_out = zstream->outbuf;
+			zs->avail_out = sizeof(zstream->outbuf);
+
+			ret = o_stream_send(zstream->output,
+					    zstream->outbuf, len);
+			if (ret != (int)len) {
+				zstream_copy_error(zstream);
+				return -1;
+			}
+			if (done)
+				break;
+		}
+
+		switch (deflate(zs, zstream->gz ? Z_FINISH : Z_SYNC_FLUSH)) {
+		case Z_OK:
+		case Z_BUF_ERROR:
+			break;
+		case Z_STREAM_END:
+			done = TRUE;
+			break;
+		default:
+			i_unreached();
+		}
+	} while (zs->avail_out != sizeof(zstream->outbuf));
+
+	if (o_stream_zlib_send_gz_trailer(zstream) < 0)
+		return -1;
+	zstream->flushed = TRUE;
+	return 0;
+}
+
+static int o_stream_zlib_flush(struct ostream_private *stream)
+{
+	struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
+
+	if (o_stream_zlib_send_flush(zstream) < 0)
+		return -1;
+
+	if (o_stream_flush(zstream->output) < 0) {
+		zstream_copy_error(zstream);
+		return -1;
+	}
+	return 0;
+}
+
+static ssize_t
+o_stream_zlib_sendv(struct ostream_private *stream,
+		    const struct const_iovec *iov, unsigned int iov_count)
+{
+	struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
+	ssize_t bytes = 0;
+	unsigned int i;
+
+	for (i = 0; i < iov_count; i++) {
+		if (o_stream_zlib_send_chunk(zstream, iov[i].iov_base,
+					     iov[i].iov_len) < 0)
+			return -1;
+		bytes += iov[i].iov_len;
+	}
+
+	stream->ostream.offset += bytes;
+	return bytes;
+}
+
+static void o_stream_zlib_init_gz_header(struct zlib_ostream *zstream,
+					 int level, int strategy)
+{
+	unsigned char *hdr = zstream->gz_header;
+
+	hdr[0] = 0x1f;
+	hdr[1] = 0x8b;
+	hdr[2] = Z_DEFLATED;
+	hdr[8] = level == 9 ? 2 :
+		(strategy >= Z_HUFFMAN_ONLY ||
+		 (level != Z_DEFAULT_COMPRESSION && level < 2) ? 4 : 0);
+	hdr[9] = ZLIB_OS_CODE;
+	i_assert(sizeof(zstream->gz_header) == 10);
+}
+
+static struct ostream *
+o_stream_create_zlib(struct ostream *output, int level, bool gz)
+{
+	const int strategy = Z_DEFAULT_STRATEGY;
+	struct zlib_ostream *zstream;
+	int ret;
+
+	i_assert(level >= 1 && level <= 9);
+
+	zstream = i_new(struct zlib_ostream, 1);
+	zstream->ostream.sendv = o_stream_zlib_sendv;
+	zstream->ostream.flush = o_stream_zlib_flush;
+	zstream->ostream.iostream.close = o_stream_zlib_close;
+	zstream->output = output;
+	zstream->crc = 0;
+	zstream->gz = gz;
+	if (!gz)
+		zstream->header_sent = TRUE;
+	o_stream_ref(output);
+
+	o_stream_zlib_init_gz_header(zstream, level, strategy);
+	ret = deflateInit2(&zstream->zs, level, Z_DEFLATED,
+			   gz ? -15 : 15, 8, strategy);
+	switch (ret) {
+	case Z_OK:
+		break;
+	case Z_MEM_ERROR:
+		i_fatal_status(FATAL_OUTOFMEM, "deflateInit(): Out of memory");
+	case Z_VERSION_ERROR:
+		i_fatal("Wrong zlib library version (broken compilation)");
+	case Z_STREAM_ERROR:
+		i_fatal("Invalid compression level %d", level);
+	default:
+		i_fatal("deflateInit() failed with %d", ret);
+	}
+
+	zstream->zs.next_out = zstream->outbuf;
+	zstream->zs.avail_out = sizeof(zstream->outbuf);
+	return o_stream_create(&zstream->ostream);
+}
+
+struct ostream *o_stream_create_gz(struct ostream *output, int level)
+{
+	return o_stream_create_zlib(output, level, TRUE);
+}
+
+struct ostream *o_stream_create_deflate(struct ostream *output, int level)
+{
+	return o_stream_create_zlib(output, level, FALSE);
+}
+#endif
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/plugins/zlib/ostream-zlib.h	Sat Feb 06 01:07:44 2010 +0200
@@ -0,0 +1,8 @@
+#ifndef OSTREAM_ZLIB_H
+#define OSTREAM_ZLIB_H
+
+struct ostream *o_stream_create_gz(struct ostream *output, int level);
+struct ostream *o_stream_create_deflate(struct ostream *output, int level);
+struct ostream *o_stream_create_bz2(struct ostream *output, int level);
+
+#endif
--- a/src/plugins/zlib/zlib-plugin.c	Sat Feb 06 01:06:47 2010 +0200
+++ b/src/plugins/zlib/zlib-plugin.c	Sat Feb 06 01:07:44 2010 +0200
@@ -2,31 +2,42 @@
 
 #include "lib.h"
 #include "array.h"
-#include "istream-zlib.h"
 #include "istream.h"
+#include "ostream.h"
 #include "maildir/maildir-storage.h"
 #include "maildir/maildir-uidlist.h"
 #include "index-mail.h"
+#include "istream-zlib.h"
+#include "ostream-zlib.h"
 #include "zlib-plugin.h"
 
+#include <stdlib.h>
 #include <fcntl.h>
 
+#define ZLIB_PLUGIN_DEFAULT_LEVEL 6
+
 #define ZLIB_CONTEXT(obj) \
 	MODULE_CONTEXT(obj, zlib_storage_module)
 #define ZLIB_MAIL_CONTEXT(obj) \
 	MODULE_CONTEXT(obj, zlib_mail_module)
+#define ZLIB_USER_CONTEXT(obj) \
+	MODULE_CONTEXT(obj, zlib_user_module)
 
 #ifndef HAVE_ZLIB
 #  define i_stream_create_zlib NULL
+#  define o_stream_create_zlib NULL
 #endif
 #ifndef HAVE_BZLIB
 #  define i_stream_create_bzlib NULL
+#  define o_stream_create_bzlib NULL
 #endif
 
 struct zlib_handler {
+	const char *name;
 	const char *ext;
 	bool (*is_compressed)(struct istream *input);
 	struct istream *(*create_istream)(int fd);
+	struct ostream *(*create_ostream)(struct ostream *output, int level);
 };
 
 struct zlib_transaction_context {
@@ -35,8 +46,17 @@
 	struct mail *tmp_mail;
 };
 
+struct zlib_user {
+	union mail_user_module_context module_ctx;
+
+	struct zlib_handler *save_handler;
+	int save_level;
+};
+
 const char *zlib_plugin_version = PACKAGE_VERSION;
 
+static MODULE_CONTEXT_DEFINE_INIT(zlib_user_module,
+				  &mail_user_module_register);
 static MODULE_CONTEXT_DEFINE_INIT(zlib_storage_module,
 				  &mail_storage_module_register);
 static MODULE_CONTEXT_DEFINE_INIT(zlib_mail_module, &mail_module_register);
@@ -74,10 +94,23 @@
 }
 
 static struct zlib_handler zlib_handlers[] = {
-	{ ".gz", is_compressed_zlib, i_stream_create_zlib },
-	{ ".bz2", is_compressed_bzlib, i_stream_create_bzlib }
+	{ "gz", ".gz", is_compressed_zlib,
+	  i_stream_create_zlib, o_stream_create_gz },
+	{ "bz2", ".bz2", is_compressed_bzlib,
+	  i_stream_create_bzlib, o_stream_create_bz2 }
 };
 
+static struct zlib_handler *zlib_find_zlib_handler(const char *name)
+{
+	unsigned int i;
+
+	for (i = 0; i < N_ELEMENTS(zlib_handlers); i++) {
+		if (strcmp(name, zlib_handlers[i].name) == 0)
+			return &zlib_handlers[i];
+	}
+	return NULL;
+}
+
 static struct zlib_handler *zlib_get_zlib_handler(struct istream *input)
 {
 	unsigned int i;
@@ -257,8 +290,29 @@
 	return 0;
 }
 
+static int
+zlib_mail_save_compress_begin(struct mail_save_context *ctx,
+			      struct istream *input)
+{
+	struct mailbox *box = ctx->transaction->box;
+	struct zlib_user *zuser = ZLIB_USER_CONTEXT(box->storage->user);
+	union mailbox_module_context *zbox = ZLIB_CONTEXT(box);
+	struct ostream *output;
+
+	if (zbox->super.save_begin(ctx, input) < 0)
+		return -1;
+
+	output = zuser->save_handler->create_ostream(ctx->output,
+						     zuser->save_level);
+	o_stream_unref(&ctx->output);
+	ctx->output = output;
+	o_stream_cork(ctx->output);
+	return 0;
+}
+
 static void zlib_maildir_open_init(struct mailbox *box)
 {
+	struct zlib_user *zuser = ZLIB_USER_CONTEXT(box->storage->user);
 	union mailbox_module_context *zbox;
 
 	zbox = p_new(box->pool, union mailbox_module_context, 1);
@@ -267,8 +321,12 @@
 	box->v.transaction_begin = zlib_mailbox_transaction_begin;
 	box->v.transaction_rollback = zlib_mailbox_transaction_rollback;
 	box->v.transaction_commit = zlib_mailbox_transaction_commit;
-	box->v.save_begin = zlib_mail_save_begin;
-	box->v.save_finish = zlib_mail_save_finish;
+	if (zuser->save_handler == NULL) {
+		box->v.save_begin = zlib_mail_save_begin;
+		box->v.save_finish = zlib_mail_save_finish;
+	} else {
+		box->v.save_begin = zlib_mail_save_compress_begin;
+	}
 
 	MODULE_CONTEXT_SET_SELF(box, zlib_storage_module, zbox);
 }
@@ -332,7 +390,35 @@
 	MODULE_CONTEXT_SET_SELF(storage, zlib_storage_module, qstorage);
 }
 
+static void zlib_mail_user_created(struct mail_user *user)
+{
+	struct zlib_user *zuser;
+	const char *name;
+
+	zuser = p_new(user->pool, struct zlib_user, 1);
+	zuser->module_ctx.super = user->v;
+
+	name = mail_user_plugin_getenv(user, "zlib_save");
+	if (name != NULL && *name != '\0') {
+		zuser->save_handler = zlib_find_zlib_handler(name);
+		if (zuser->save_handler == NULL)
+			i_error("zlib_save: Unknown handler: %s", name);
+	}
+	name = mail_user_plugin_getenv(user, "zlib_save_level");
+	if (name != NULL) {
+		zuser->save_level = atoi(name);
+		if (zuser->save_level < 1 || zuser->save_level > 9) {
+			i_error("zlib_save_level: Level must be between 1..9");
+			zuser->save_level = 0;
+		}
+	}
+	if (zuser->save_level == 0)
+		zuser->save_level = ZLIB_PLUGIN_DEFAULT_LEVEL;
+	MODULE_CONTEXT_SET(user, zlib_user_module, zuser);
+}
+
 static struct mail_storage_hooks zlib_mail_storage_hooks = {
+	.mail_user_created = zlib_mail_user_created,
 	.mail_storage_created = zlib_mail_storage_created
 };