changeset 18543:4c8fc477d68a

lib-fs: Added fs-compress wrapper. Future TODO could include automatically detecting the format of the input file, but this should be optional.
author Timo Sirainen <tss@iki.fi>
date Thu, 07 May 2015 20:25:44 +0300
parents c45fe041ccfb
children 7516211ff7e6
files src/lib-fs/Makefile.am src/lib-fs/fs-api-private.h src/lib-fs/fs-api.c src/lib-fs/fs-compress.c
diffstat 4 files changed, 428 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib-fs/Makefile.am	Thu May 07 18:21:35 2015 +0300
+++ b/src/lib-fs/Makefile.am	Thu May 07 20:25:44 2015 +0300
@@ -1,7 +1,12 @@
 noinst_LTLIBRARIES = libfs.la
 
+fs_moduledir = $(moduledir)
+fs_module_LTLIBRARIES = \
+	libfs_compress.la
+
 AM_CPPFLAGS = \
 	-I$(top_srcdir)/src/lib \
+	-I$(top_srcdir)/src/lib-compression \
 	-I$(top_srcdir)/src/lib-ssl-iostream \
 	-DMODULE_DIR=\""$(moduledir)"\"
 
@@ -17,6 +22,11 @@
 	ostream-metawrap.c \
 	ostream-cmp.c
 
+libfs_compress_la_SOURCES = fs-compress.c
+libfs_compress_la_LIBADD = ../lib-compression/libdovecot-compression.la
+libfs_compress_la_DEPS = ../lib-compression/libdovecot-compression.la
+libfs_compress_la_LDFLAGS = -module -avoid-version
+
 headers = \
 	fs-api.h \
 	fs-api-private.h \
--- a/src/lib-fs/fs-api-private.h	Thu May 07 18:21:35 2015 +0300
+++ b/src/lib-fs/fs-api-private.h	Thu May 07 20:25:44 2015 +0300
@@ -134,6 +134,7 @@
 	void *async_context;
 };
 
+extern const struct fs fs_class_compress;
 extern const struct fs fs_class_posix;
 extern const struct fs fs_class_metawrap;
 extern const struct fs fs_class_sis;
--- a/src/lib-fs/fs-api.c	Thu May 07 18:21:35 2015 +0300
+++ b/src/lib-fs/fs-api.c	Thu May 07 20:25:44 2015 +0300
@@ -63,6 +63,7 @@
 static void fs_classes_init(void)
 {
 	i_array_init(&fs_classes, 8);
+	fs_class_register(&fs_class_compress);
 	fs_class_register(&fs_class_posix);
 	fs_class_register(&fs_class_metawrap);
 	fs_class_register(&fs_class_sis);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib-fs/fs-compress.c	Thu May 07 20:25:44 2015 +0300
@@ -0,0 +1,416 @@
+/* Copyright (c) 2015 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "istream.h"
+#include "ostream.h"
+#include "iostream-temp.h"
+#include "compression.h"
+#include "fs-api-private.h"
+
+struct compress_fs {
+	struct fs fs;
+	const struct compression_handler *handler;
+	unsigned int compress_level;
+};
+
+struct compress_fs_file {
+	struct fs_file file;
+	struct compress_fs *fs;
+	struct fs_file *super, *super_read;
+	enum fs_open_mode open_mode;
+	struct istream *input;
+
+	struct ostream *super_output;
+	struct ostream *temp_output;
+};
+
+struct compress_fs_iter {
+	struct fs_iter iter;
+	struct fs_iter *super;
+};
+
+static struct fs *fs_compress_alloc(void)
+{
+	struct compress_fs *fs;
+
+	fs = i_new(struct compress_fs, 1);
+	fs->fs = fs_class_compress;
+	return &fs->fs;
+}
+
+static int
+fs_compress_init(struct fs *_fs, const char *args, const
+		 struct fs_settings *set)
+{
+	struct compress_fs *fs = (struct compress_fs *)_fs;
+	const char *p, *compression_name, *level_str, *error;
+	const char *parent_name, *parent_args;
+
+	/* get compression handler name */
+	p = strchr(args, ':');
+	if (p == NULL) {
+		fs_set_error(_fs, "Compression method not given as parameter");
+		return -1;
+	}
+	compression_name = t_strdup_until(args, p++);
+	args = p;
+
+	/* get compression level */
+	p = strchr(args, ':');
+	if (p == NULL || p[1] == '\0') {
+		fs_set_error(_fs, "Parent filesystem not given as parameter");
+		return -1;
+	}
+
+	level_str = t_strdup_until(args, p++);
+	if (str_to_uint(level_str, &fs->compress_level) < 0 ||
+	    fs->compress_level < 1 || fs->compress_level > 9) {
+		fs_set_error(_fs, "Invalid compression level parameter '%s'", level_str);
+		return -1;
+	}
+	args = p;
+
+	fs->handler = compression_lookup_handler(compression_name);
+	if (fs->handler == NULL) {
+		fs_set_error(_fs, "Compression method '%s' not support", compression_name);
+		return -1;
+	}
+
+	parent_args = strchr(args, ':');
+	if (parent_args == NULL) {
+		parent_name = args;
+		parent_args = "";
+	} else {
+		parent_name = t_strdup_until(args, parent_args);
+		parent_args++;
+	}
+	if (fs_init(parent_name, parent_args, set, &_fs->parent, &error) < 0) {
+		fs_set_error(_fs, "%s: %s", parent_name, error);
+		return -1;
+	}
+	return 0;
+}
+
+static void fs_compress_deinit(struct fs *_fs)
+{
+	struct compress_fs *fs = (struct compress_fs *)_fs;
+
+	if (_fs->parent != NULL)
+		fs_deinit(&_fs->parent);
+	i_free(fs);
+}
+
+static enum fs_properties fs_compress_get_properties(struct fs *_fs)
+{
+	return fs_get_properties(_fs->parent);
+}
+
+static struct fs_file *
+fs_compress_file_init(struct fs *_fs, const char *path,
+		      enum fs_open_mode mode, enum fs_open_flags flags)
+{
+	struct compress_fs *fs = (struct compress_fs *)_fs;
+	struct compress_fs_file *file;
+
+	file = i_new(struct compress_fs_file, 1);
+	file->file.fs = _fs;
+	file->file.path = i_strdup(path);
+	file->fs = fs;
+	file->open_mode = mode;
+
+	/* avoid unnecessarily creating two seekable streams */
+	flags &= ~FS_OPEN_FLAG_SEEKABLE;
+
+	file->super = fs_file_init(_fs->parent, path, mode | flags);
+	if (mode == FS_OPEN_MODE_READONLY &&
+	    (flags & FS_OPEN_FLAG_ASYNC) == 0) {
+		/* use async stream for super, so fs_read_stream() won't create
+		   another seekable stream unneededly */
+		file->super_read = fs_file_init(_fs->parent, path, mode | flags |
+						FS_OPEN_FLAG_ASYNC);
+	} else {
+		file->super_read = file->super;
+	}
+	return &file->file;
+}
+
+static void fs_compress_file_deinit(struct fs_file *_file)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	if (file->super_read != file->super && file->super_read != NULL)
+		fs_file_deinit(&file->super_read);
+	fs_file_deinit(&file->super);
+	i_free(file->file.path);
+	i_free(file);
+}
+
+static void fs_compress_file_close(struct fs_file *_file)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	if (file->input != NULL)
+		i_stream_unref(&file->input);
+	if (file->super_read != NULL)
+		fs_file_close(file->super_read);
+	if (file->super != NULL)
+		fs_file_close(file->super);
+}
+
+static const char *fs_compress_file_get_path(struct fs_file *_file)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	return fs_file_path(file->super);
+}
+
+static void
+fs_compress_set_async_callback(struct fs_file *_file,
+			       fs_file_async_callback_t *callback,
+			       void *context)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	fs_file_set_async_callback(file->super, callback, context);
+}
+
+static int fs_compress_wait_async(struct fs *_fs)
+{
+	return fs_wait_async(_fs->parent);
+}
+
+static void
+fs_compress_set_metadata(struct fs_file *_file, const char *key,
+			 const char *value)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	fs_set_metadata(file->super, key, value);
+}
+
+static int
+fs_compress_get_metadata(struct fs_file *_file,
+			 const ARRAY_TYPE(fs_metadata) **metadata_r)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	return fs_get_metadata(file->super, metadata_r);
+}
+
+static bool fs_compress_prefetch(struct fs_file *_file, uoff_t length)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	return fs_prefetch(file->super, length);
+}
+
+static struct istream *
+fs_compress_read_stream(struct fs_file *_file, size_t max_buffer_size)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+	struct istream *input;
+
+	if (file->input != NULL) {
+		i_stream_ref(file->input);
+		i_stream_seek(file->input, 0);
+		return file->input;
+	}
+
+	input = fs_read_stream(file->super_read, max_buffer_size);
+	file->input = file->fs->handler->create_istream(input, FALSE);
+	i_stream_unref(&input);
+	i_stream_ref(file->input);
+	return file->input;
+}
+
+static void fs_compress_write_stream(struct fs_file *_file)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	i_assert(_file->output == NULL);
+
+	file->temp_output =
+		iostream_temp_create_named(_file->fs->temp_path_prefix,
+					   IOSTREAM_TEMP_FLAG_TRY_FD_DUP,
+					   fs_file_path(_file));
+	_file->output = file->fs->handler->
+		create_ostream(file->temp_output, file->fs->compress_level);
+}
+
+static int fs_compress_write_stream_finish(struct fs_file *_file, bool success)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+	struct istream *input;
+	int ret;
+
+	if (_file->output != NULL) {
+		if (_file->output->closed)
+			success = FALSE;
+		if (_file->output == file->super_output)
+			_file->output = NULL;
+		else
+			o_stream_unref(&_file->output);
+	}
+	if (!success) {
+		if (file->temp_output != NULL)
+			o_stream_destroy(&file->temp_output);
+		if (file->super_output != NULL)
+			fs_write_stream_abort(file->super, &file->super_output);
+		return -1;
+	}
+
+	if (file->super_output != NULL) {
+		i_assert(file->temp_output == NULL);
+		return fs_write_stream_finish(file->super, &file->super_output);
+	}
+	if (file->temp_output == NULL) {
+		/* finishing up */
+		i_assert(file->super_output == NULL);
+		return fs_write_stream_finish(file->super, &file->temp_output);
+	}
+	/* finish writing the temporary file */
+	input = iostream_temp_finish(&file->temp_output, IO_BLOCK_SIZE);
+	file->super_output = fs_write_stream(file->super);
+	if (o_stream_send_istream(file->super_output, input) >= 0)
+		ret = fs_write_stream_finish(file->super, &file->super_output);
+	else if (input->stream_errno != 0) {
+		fs_set_error(_file->fs, "read(%s) failed: %s",
+			     i_stream_get_name(input),
+			     i_stream_get_error(input));
+		fs_write_stream_abort(file->super, &file->super_output);
+		ret = -1;
+	} else {
+		i_assert(file->super_output->stream_errno != 0);
+		fs_set_error(_file->fs, "write(%s) failed: %s",
+			     o_stream_get_name(file->super_output),
+			     o_stream_get_error(file->super_output));
+		fs_write_stream_abort(file->super, &file->super_output);
+		ret = -1;
+	}
+	i_stream_unref(&input);
+	return ret;
+}
+
+static int
+fs_compress_lock(struct fs_file *_file, unsigned int secs, struct fs_lock **lock_r)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	return fs_lock(file->super, secs, lock_r);
+}
+
+static void fs_compress_unlock(struct fs_lock *_lock ATTR_UNUSED)
+{
+	i_unreached();
+}
+
+static int fs_compress_exists(struct fs_file *_file)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	return fs_exists(file->super);
+}
+
+static int fs_compress_stat(struct fs_file *_file, struct stat *st_r)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	return fs_stat(file->super, st_r);
+}
+
+static int fs_compress_copy(struct fs_file *_src, struct fs_file *_dest)
+{
+	struct compress_fs_file *src = (struct compress_fs_file *)_src;
+	struct compress_fs_file *dest = (struct compress_fs_file *)_dest;
+
+	if (_src != NULL)
+		return fs_copy(src->super, dest->super);
+	else
+		return fs_copy_finish_async(dest->super);
+}
+
+static int fs_compress_rename(struct fs_file *_src, struct fs_file *_dest)
+{
+	struct compress_fs_file *src = (struct compress_fs_file *)_src;
+	struct compress_fs_file *dest = (struct compress_fs_file *)_dest;
+
+	return fs_rename(src->super, dest->super);
+}
+
+static int fs_compress_delete(struct fs_file *_file)
+{
+	struct compress_fs_file *file = (struct compress_fs_file *)_file;
+
+	return fs_delete(file->super);
+}
+
+static struct fs_iter *
+fs_compress_iter_init(struct fs *_fs, const char *path,
+		      enum fs_iter_flags flags)
+{
+	struct compress_fs_iter *iter;
+
+	iter = i_new(struct compress_fs_iter, 1);
+	iter->iter.fs = _fs;
+	iter->iter.flags = flags;
+	iter->super = fs_iter_init(_fs->parent, path, flags);
+	return &iter->iter;
+}
+
+static const char *fs_compress_iter_next(struct fs_iter *_iter)
+{
+	struct compress_fs_iter *iter = (struct compress_fs_iter *)_iter;
+	const char *fname;
+
+	iter->super->async_callback = _iter->async_callback;
+	iter->super->async_context = _iter->async_context;
+
+	fname = fs_iter_next(iter->super);
+	_iter->async_have_more = iter->super->async_have_more;
+	return fname;
+}
+
+static int fs_compress_iter_deinit(struct fs_iter *_iter)
+{
+	struct compress_fs_iter *iter = (struct compress_fs_iter *)_iter;
+	int ret;
+
+	ret = fs_iter_deinit(&iter->super);
+	i_free(iter);
+	return ret;
+}
+
+const struct fs fs_class_compress = {
+	.name = "compress",
+	.v = {
+		fs_compress_alloc,
+		fs_compress_init,
+		fs_compress_deinit,
+		fs_compress_get_properties,
+		fs_compress_file_init,
+		fs_compress_file_deinit,
+		fs_compress_file_close,
+		fs_compress_file_get_path,
+		fs_compress_set_async_callback,
+		fs_compress_wait_async,
+		fs_compress_set_metadata,
+		fs_compress_get_metadata,
+		fs_compress_prefetch,
+		fs_read_via_stream,
+		fs_compress_read_stream,
+		fs_write_via_stream,
+		fs_compress_write_stream,
+		fs_compress_write_stream_finish,
+		fs_compress_lock,
+		fs_compress_unlock,
+		fs_compress_exists,
+		fs_compress_stat,
+		fs_compress_copy,
+		fs_compress_rename,
+		fs_compress_delete,
+		fs_compress_iter_init,
+		fs_compress_iter_next,
+		fs_compress_iter_deinit
+	}
+};