view src/plugins/fts/fts-parser-tika.c @ 19552:0f22db71df7a

global: freshen copyright git ls-files | xargs perl -p -i -e 's/(\d+)-201[0-5]/$1-2016/g;s/ (201[0-5]) Dovecot/ $1-2016 Dovecot/'
author Timo Sirainen <timo.sirainen@dovecot.fi>
date Wed, 13 Jan 2016 12:24:03 +0200
parents 35d3777cc4d2
children 5ea46ed8c156
line wrap: on
line source

/* Copyright (c) 2014-2016 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "ioloop.h"
#include "istream.h"
#include "module-context.h"
#include "http-url.h"
#include "http-client.h"
#include "message-parser.h"
#include "mail-user.h"
#include "fts-parser.h"

#define TIKA_USER_CONTEXT(obj) \
	MODULE_CONTEXT(obj, fts_parser_tika_user_module)

struct fts_parser_tika_user {
	union mail_user_module_context module_ctx;
	struct http_url *http_url;
};

struct tika_fts_parser {
	struct fts_parser parser;
	struct mail_user *user;
	struct http_client_request *http_req;

	struct ioloop *ioloop;
	struct io *io;
	struct istream *payload;

	bool failed;
};

static struct http_client *tika_http_client = NULL;
static MODULE_CONTEXT_DEFINE_INIT(fts_parser_tika_user_module,
				  &mail_user_module_register);

static int
tika_get_http_client_url(struct mail_user *user, struct http_url **http_url_r)
{
	struct fts_parser_tika_user *tuser = TIKA_USER_CONTEXT(user);
	struct http_client_settings http_set;
	const char *url, *error;

	url = mail_user_plugin_getenv(user, "fts_tika");
	if (url == NULL) {
		/* fts_tika disabled */
		return -1;
	}

	if (tuser != NULL) {
		*http_url_r = tuser->http_url;
		return *http_url_r == NULL ? -1 : 0;
	}

	tuser = p_new(user->pool, struct fts_parser_tika_user, 1);
	MODULE_CONTEXT_SET(user, fts_parser_tika_user_module, tuser);

	if (http_url_parse(url, NULL, 0, user->pool,
			   &tuser->http_url, &error) < 0) {
		i_error("fts_tika: Failed to parse HTTP url %s: %s", url, error);
		return -1;
	}

	if (tika_http_client == NULL) {
		memset(&http_set, 0, sizeof(http_set));
		http_set.max_idle_time_msecs = 100;
		http_set.max_parallel_connections = 1;
		http_set.max_pipelined_requests = 1;
		http_set.max_redirects = 1;
		http_set.max_attempts = 3;
		http_set.connect_timeout_msecs = 5*1000;
		http_set.request_timeout_msecs = 60*1000;
		http_set.debug = user->mail_debug;
		tika_http_client = http_client_init(&http_set);
	}
	*http_url_r = tuser->http_url;
	return 0;
}

static void
fts_tika_parser_response(const struct http_response *response,
			 struct tika_fts_parser *parser)
{
	i_assert(parser->payload == NULL);

	switch (response->status) {
	case 200:
		/* read response */
		if (response->payload == NULL)
			parser->payload = i_stream_create_from_data("", 0);
		else {
			i_stream_ref(response->payload);
			parser->payload = response->payload;
		}
		break;
	case 204: /* empty response */
	case 415: /* Unsupported Media Type */
	case 422: /* Unprocessable Entity */
		if (parser->user->mail_debug) {
			i_debug("fts_tika: PUT %s failed: %u %s",
				mail_user_plugin_getenv(parser->user, "fts_tika"),
				response->status, response->reason);
		}
		parser->payload = i_stream_create_from_data("", 0);
		break;
	case 500:
		/* Server Error - the problem could be anything (in Tika or
		   HTTP server or proxy) and might be retriable, but Tika has
		   trouble processing some documents and throws up this error
		   every time for those documents. So we try retrying this a
		   couple of times, but if that doesn't work we'll just ignore
		   it. */
		if (http_client_request_try_retry(parser->http_req))
			return;
		i_info("fts_tika: PUT %s failed: %u %s - ignoring",
		       mail_user_plugin_getenv(parser->user, "fts_tika"),
		       response->status, response->reason);
		parser->payload = i_stream_create_from_data("", 0);
		break;

	default:
		i_error("fts_tika: PUT %s failed: %u %s",
			mail_user_plugin_getenv(parser->user, "fts_tika"),
			response->status, response->reason);
		parser->failed = TRUE;
		break;
	}
	parser->http_req = NULL;
	io_loop_stop(current_ioloop);
}

static struct fts_parser *
fts_parser_tika_try_init(struct mail_user *user, const char *content_type,
			 const char *content_disposition)
{
	struct tika_fts_parser *parser;
	struct http_url *http_url;
	struct http_client_request *http_req;

	if (tika_get_http_client_url(user, &http_url) < 0)
		return NULL;

	parser = i_new(struct tika_fts_parser, 1);
	parser->parser.v = fts_parser_tika;
	parser->user = user;

	http_req = http_client_request(tika_http_client, "PUT",
			http_url->host_name,
			t_strconcat(http_url->path, http_url->enc_query, NULL),
			fts_tika_parser_response, parser);
	http_client_request_set_port(http_req, http_url->port);
	http_client_request_set_ssl(http_req, http_url->have_ssl);
	http_client_request_add_header(http_req, "Content-Type", content_type);
	http_client_request_add_header(http_req, "Content-Disposition",
				       content_disposition);
	http_client_request_add_header(http_req, "Accept", "text/plain");

	parser->http_req = http_req;
	return &parser->parser;
}

static void fts_parser_tika_more(struct fts_parser *_parser,
				 struct message_block *block)
{
	struct tika_fts_parser *parser = (struct tika_fts_parser *)_parser;
	struct ioloop *prev_ioloop = current_ioloop;
	const unsigned char *data;
	size_t size;
	ssize_t ret;

	if (block->size > 0) {
		/* first we'll send everything to Tika */
		if (!parser->failed &&
		    http_client_request_send_payload(&parser->http_req,
						     block->data,
						     block->size) < 0)
			parser->failed = TRUE;
		block->size = 0;
		return;
	}

	if (parser->payload == NULL) {
		/* read the result from Tika */
		if (!parser->failed &&
		    http_client_request_finish_payload(&parser->http_req) < 0)
			parser->failed = TRUE;
		if (!parser->failed && parser->payload == NULL)
			http_client_wait(tika_http_client);
		if (parser->failed)
			return;
		i_assert(parser->payload != NULL);
	}
	/* continue returning data from Tika. we'll create a new ioloop just
	   for reading this one payload. */
	while ((ret = i_stream_read_data(parser->payload, &data, &size, 0)) == 0) {
		if (parser->failed)
			break;
		/* wait for more input from Tika */
		if (parser->ioloop == NULL) {
			parser->ioloop = io_loop_create();
			parser->io = io_add_istream(parser->payload, io_loop_stop,
						    current_ioloop);
		} else {
			io_loop_set_current(parser->ioloop);
		}
		io_loop_run(current_ioloop);
	}
	/* switch back to original ioloop. */
	io_loop_set_current(prev_ioloop);

	if (parser->failed)
		;
	else if (size > 0) {
		i_assert(ret > 0);
		block->data = data;
		block->size = size;
		i_stream_skip(parser->payload, size);
	} else {
		/* finished */
		i_assert(ret == -1);
		if (parser->payload->stream_errno != 0) {
			i_error("read(%s) failed: %s",
				i_stream_get_name(parser->payload),
				i_stream_get_error(parser->payload));
			parser->failed = TRUE;
		}
	}
}

static int fts_parser_tika_deinit(struct fts_parser *_parser)
{
	struct tika_fts_parser *parser = (struct tika_fts_parser *)_parser;
	int ret = parser->failed ? -1 : 0;

	/* remove io before unrefing payload - otherwise lib-http adds another
	   timeout to ioloop unnecessarily */
	if (parser->payload != NULL)
		i_stream_unref(&parser->payload);
	if (parser->io != NULL)
		io_remove(&parser->io);
	if (parser->http_req != NULL)
		http_client_request_abort(&parser->http_req);
	if (parser->ioloop != NULL) {
		io_loop_set_current(parser->ioloop);
		io_loop_destroy(&parser->ioloop);
	}
	i_free(parser);
	return ret;
}

static void fts_parser_tika_unload(void)
{
	if (tika_http_client != NULL)
		http_client_deinit(&tika_http_client);
}

struct fts_parser_vfuncs fts_parser_tika = {
	fts_parser_tika_try_init,
	fts_parser_tika_more,
	fts_parser_tika_deinit,
	fts_parser_tika_unload
};