view src/plugins/fts-solr/solr-connection.c @ 7999:1ceb49f2eb50 HEAD

fts-solr: Error handling fixes.
author Timo Sirainen <tss@iki.fi>
date Sun, 13 Jul 2008 14:40:03 +0300
parents 662172573fe1
children 3fbfdda3e5d3
line wrap: on
line source

/* Copyright (c) 2006-2008 Dovecot authors, see the included COPYING file */

/* curl: 7.16.0 curl_multi_timeout */

#include "lib.h"
#include "str.h"
#include "strescape.h"
#include "solr-connection.h"

#include <curl/curl.h>
#include <expat.h>

enum solr_xml_response_state {
	SOLR_XML_RESPONSE_STATE_ROOT,
	SOLR_XML_RESPONSE_STATE_RESPONSE,
	SOLR_XML_RESPONSE_STATE_RESULT,
	SOLR_XML_RESPONSE_STATE_DOC,
	SOLR_XML_RESPONSE_STATE_CONTENT
};

enum solr_xml_content_state {
	SOLR_XML_CONTENT_STATE_NONE = 0,
	SOLR_XML_CONTENT_STATE_UID,
	SOLR_XML_CONTENT_STATE_SCORE
};

struct solr_lookup_xml_context {
	enum solr_xml_response_state state;
	enum solr_xml_content_state content_state;
	int depth;

	ARRAY_TYPE(seq_range) *uids;
};

struct solr_connection_post {
	struct solr_connection *conn;
	const unsigned char *data;
	size_t size, pos;

	unsigned int failed:1;
};

struct solr_connection {
	CURL *curl;
	CURLM *curlm;

	char curl_errorbuf[CURL_ERROR_SIZE];
	struct curl_slist *headers, *headers_post;
	XML_Parser xml_parser;

	char *url;
	char *http_failure;

	unsigned int debug:1;
	unsigned int posting:1;
	unsigned int xml_failed:1;
};

static void
solr_conn_init_settings(struct solr_connection *conn, const char *str)
{
	const char *const *tmp;

	if (str == NULL)
		str = "";

	for (tmp = t_strsplit_spaces(str, " "); *tmp != NULL; tmp++) {
		if (strncmp(*tmp, "url=", 4) == 0) {
			i_free(conn->url);
			conn->url = i_strdup(*tmp + 4);
		} else if (strcmp(*tmp, "debug") == 0) {
			conn->debug = TRUE;
		} else {
			i_fatal("fts_solr: Invalid setting: %s", *tmp);
		}
	}
	if (conn->url == NULL)
		i_fatal("fts_solr: url setting missing");
}

static size_t
curl_output_func(void *data, size_t element_size, size_t nmemb, void *context)
{
	struct solr_connection_post *post = context;
	size_t size = element_size * nmemb;

	/* @UNSAFE */
	if (size > post->size - post->pos)
		size = post->size - post->pos;

	memcpy(data, post->data + post->pos, size);
	post->pos += size;
	return size;
}

static int solr_xml_parse(struct solr_connection *conn,
			  const void *data, size_t size, bool done)
{
	enum XML_Error err;
	int line;

	if (conn->xml_failed)
		return -1;

	if (XML_Parse(conn->xml_parser, data, size, done))
		return 0;

	err = XML_GetErrorCode(conn->xml_parser);
	if (err != XML_ERROR_FINISHED) {
		line = XML_GetCurrentLineNumber(conn->xml_parser);
		i_error("fts_solr: Invalid XML input at line %d: %s",
			line, XML_ErrorString(err));
		conn->xml_failed = TRUE;
		return -1;
	}
	return 0;
}

static size_t
curl_input_func(void *data, size_t element_size, size_t nmemb, void *context)
{
	struct solr_connection *conn = context;
	size_t size = element_size * nmemb;

	(void)solr_xml_parse(conn, data, size, FALSE);
	return size;
}

static size_t
curl_header_func(void *data, size_t element_size, size_t nmemb, void *context)
{
	struct solr_connection *conn = context;
	size_t size = element_size * nmemb;
	const unsigned char *p;
	size_t i;

	if (conn->http_failure != NULL)
		return size;

	for (i = 0, p = data; i < size; i++) {
		if (p[i] == ' ') {
			i++;
			break;
		}
	}
	if (i == size || p[i] < '0' || p[i] > '9')
		i = 0;
	conn->http_failure = i_strndup(p + i, size - i);
	return size;
}

struct solr_connection *solr_connection_init(const char *settings)
{
	struct solr_connection *conn;

	conn = i_new(struct solr_connection, 1);
	solr_conn_init_settings(conn, settings);

	conn->curlm = curl_multi_init();
	conn->curl = curl_easy_init();
	if (conn->curl == NULL || conn->curlm == NULL) {
		i_fatal_status(FATAL_OUTOFMEM,
			       "fts_solr: Failed to allocate curl");
	}

	/* set global curl options */
	curl_easy_setopt(conn->curl, CURLOPT_ERRORBUFFER, conn->curl_errorbuf);
	if (conn->debug)
		curl_easy_setopt(conn->curl, CURLOPT_VERBOSE, 1L);

	curl_easy_setopt(conn->curl, CURLOPT_NOPROGRESS, 1L);
	curl_easy_setopt(conn->curl, CURLOPT_NOSIGNAL, 1L);
	curl_easy_setopt(conn->curl, CURLOPT_READFUNCTION, curl_output_func);
	curl_easy_setopt(conn->curl, CURLOPT_WRITEFUNCTION, curl_input_func);
	curl_easy_setopt(conn->curl, CURLOPT_WRITEDATA, conn);
	curl_easy_setopt(conn->curl, CURLOPT_HEADERFUNCTION, curl_header_func);
	curl_easy_setopt(conn->curl, CURLOPT_HEADERDATA, conn);

	conn->headers = curl_slist_append(NULL, "Content-Type: text/xml");
	conn->headers_post = curl_slist_append(NULL, "Content-Type: text/xml");
	conn->headers_post = curl_slist_append(conn->headers_post,
					       "Transfer-Encoding: chunked");
	conn->headers_post = curl_slist_append(conn->headers_post,
					       "Expect:");
	curl_easy_setopt(conn->curl, CURLOPT_HTTPHEADER, conn->headers);

	conn->xml_parser = XML_ParserCreate("UTF-8");
	if (conn->xml_parser == NULL) {
		i_fatal_status(FATAL_OUTOFMEM,
			       "fts_solr: Failed to allocate XML parser");
	}
	return conn;
}

void solr_connection_deinit(struct solr_connection *conn)
{
	curl_slist_free_all(conn->headers);
	curl_slist_free_all(conn->headers_post);
	curl_multi_cleanup(conn->curlm);
	curl_easy_cleanup(conn->curl);
	i_free(conn->url);
	i_free(conn);
}

void solr_connection_quote_str(struct solr_connection *conn, string_t *dest,
			       const char *str)
{
	char *encoded;

	encoded = curl_easy_escape(conn->curl, str_escape(str), 0);
	str_printfa(dest, "%%22%s%%22", encoded);
	curl_free(encoded);
}

static const char *attrs_get_name(const char **attrs)
{
	for (; *attrs != NULL; attrs += 2) {
		if (strcmp(attrs[0], "name") == 0)
			return attrs[1];
	}
	return "";
}

static void
solr_lookup_xml_start(void *context, const char *name, const char **attrs)
{
	struct solr_lookup_xml_context *ctx = context;
	const char *name_attr;

	i_assert(ctx->depth >= (int)ctx->state);

	ctx->depth++;
	if (ctx->depth - 1 > (int)ctx->state) {
		/* skipping over unwanted elements */
		return;
	}

	/* response -> result -> doc */
	switch (ctx->state) {
	case SOLR_XML_RESPONSE_STATE_ROOT:
		if (strcmp(name, "response") == 0)
			ctx->state++;
		break;
	case SOLR_XML_RESPONSE_STATE_RESPONSE:
		if (strcmp(name, "result") == 0)
			ctx->state++;
		break;
	case SOLR_XML_RESPONSE_STATE_RESULT:
		if (strcmp(name, "doc") == 0)
			ctx->state++;
		break;
	case SOLR_XML_RESPONSE_STATE_DOC:
		name_attr = attrs_get_name(attrs);
		if (strcmp(name_attr, "uid") == 0)
			ctx->content_state = SOLR_XML_CONTENT_STATE_UID;
		else if (strcmp(name_attr, "score") == 0)
			ctx->content_state = SOLR_XML_CONTENT_STATE_SCORE;
		else 
			break;
		ctx->state++;
		break;
	case SOLR_XML_RESPONSE_STATE_CONTENT:
		break;
	}
}

static void solr_lookup_xml_end(void *context, const char *name ATTR_UNUSED)
{
	struct solr_lookup_xml_context *ctx = context;

	i_assert(ctx->depth >= (int)ctx->state);

	if (ctx->depth == (int)ctx->state) {
		ctx->state--;
		ctx->content_state = SOLR_XML_CONTENT_STATE_NONE;
	}
	ctx->depth--;
}

static void solr_lookup_xml_data(void *context, const char *str, int len)
{
	struct solr_lookup_xml_context *ctx = context;
	uint32_t uid;
	int i;

	switch (ctx->content_state) {
	case SOLR_XML_CONTENT_STATE_NONE:
		break;
	case SOLR_XML_CONTENT_STATE_UID:
		for (i = 0, uid = 0; i < len; i++) {
			if (str[i] < '0' || str[i] > '9')
				break;
			uid = uid*10 + str[i]-'0';
		}
		if (i != len) {
			i_error("fts_solr: received invalid uid");
			break;
		}
		seq_range_array_add(ctx->uids, 0, uid);
		break;
	case SOLR_XML_CONTENT_STATE_SCORE:
		/* FIXME */
		break;
	}
}

int solr_connection_select(struct solr_connection *conn, const char *query,
			   ARRAY_TYPE(seq_range) *uids)
{
	struct solr_lookup_xml_context solr_lookup_context;
	string_t *str;
	CURLcode ret;
	long httpret;

	i_assert(!conn->posting);

	memset(&solr_lookup_context, 0, sizeof(solr_lookup_context));
	solr_lookup_context.uids = uids;

	i_free_and_null(conn->http_failure);
	conn->xml_failed = FALSE;
	XML_SetElementHandler(conn->xml_parser,
			      solr_lookup_xml_start, solr_lookup_xml_end);
	XML_SetCharacterDataHandler(conn->xml_parser, solr_lookup_xml_data);
	XML_SetUserData(conn->xml_parser, &solr_lookup_context);

	str = t_str_new(256);
	str_append(str, conn->url);
	str_append(str, "select?");
	str_append(str, query);

	curl_easy_setopt(conn->curl, CURLOPT_URL, str_c(str));
	ret = curl_easy_perform(conn->curl);
	if (ret != 0) {
		i_error("fts_solr: HTTP GET failed: %s",
			conn->curl_errorbuf);
		return -1;
	}
	curl_easy_getinfo(conn->curl, CURLINFO_RESPONSE_CODE, &httpret);
	if (httpret != 200) {
		i_error("fts_solr: Lookup failed: %s", conn->http_failure);
		ret = -1;
	}
	return solr_xml_parse(conn, NULL, 0, TRUE);
}

struct solr_connection_post *
solr_connection_post_begin(struct solr_connection *conn)
{
	struct solr_connection_post *post;
	CURLMcode merr;
	string_t *str;

	post = i_new(struct solr_connection_post, 1);
	post->conn = conn;

	i_assert(!conn->posting);
	conn->posting = TRUE;

	i_free_and_null(conn->http_failure);

	curl_easy_setopt(conn->curl, CURLOPT_READDATA, post);
	merr = curl_multi_add_handle(conn->curlm, conn->curl);
	if (merr != CURLM_OK) {
		i_error("fts_solr: curl_multi_add_handle() failed: %s",
			curl_multi_strerror(merr));
		post->failed = TRUE;
	} else {
		str = t_str_new(256);
		str_append(str, conn->url);
		str_append(str, "update");

		curl_easy_setopt(conn->curl, CURLOPT_URL, str_c(str));
		curl_easy_setopt(conn->curl, CURLOPT_HTTPHEADER,
				 conn->headers_post);
		curl_easy_setopt(conn->curl, CURLOPT_POST, (long)1);
	}
	return post;
}

void solr_connection_post_more(struct solr_connection_post *post,
			       const unsigned char *data, size_t size)
{
	fd_set fdread;
	fd_set fdwrite;
	fd_set fdexcep;
	struct timeval timeout_tv;
	long timeout;
	CURLMcode merr;
	int ret, handles, maxfd;

	i_assert(post->conn->posting);

	if (post->failed)
		return;

	post->data = data;
	post->size = size;
	post->pos = 0;

	for (;;) {
		merr = curl_multi_perform(post->conn->curlm, &handles);
		if (merr == CURLM_CALL_MULTI_PERFORM)
			continue;
		if (merr != CURLM_OK) {
			i_error("fts_solr: curl_multi_perform() failed: %s",
				curl_multi_strerror(merr));
			break;
		}
		if ((post->pos == post->size && post->size != 0) ||
		    (handles == 0 && post->size == 0)) {
			/* everything sent successfully */
			return;
		}

		/* everything wasn't sent - wait. just use select,
		   since libcurl interface is easiest with it. */
		FD_ZERO(&fdread);
		FD_ZERO(&fdwrite);
		FD_ZERO(&fdexcep);

		merr = curl_multi_fdset(post->conn->curlm, &fdread, &fdwrite,
					&fdexcep, &maxfd);
		if (merr != CURLM_OK) {
			i_error("fts_solr: curl_multi_fdset() failed: %s",
				curl_multi_strerror(merr));
			break;
		}
		i_assert(maxfd >= 0);

		merr = curl_multi_timeout(post->conn->curlm, &timeout);
		if (merr != CURLM_OK) {
			i_error("fts_solr: curl_multi_timeout() failed: %s",
				curl_multi_strerror(merr));
			break;
		}

		if (timeout < 0) {
			timeout_tv.tv_sec = 1;
			timeout_tv.tv_usec = 0;
		} else {
			timeout_tv.tv_sec = timeout / 1000;
			timeout_tv.tv_usec = (timeout % 1000) * 1000;
		}
		ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout_tv);
		if (ret < 0) {
			i_error("fts_solr: select() failed: %m");
			break;
		}
	}
	post->failed = TRUE;
}

int solr_connection_post_end(struct solr_connection_post *post)
{
	struct solr_connection *conn = post->conn;
	long httpret;
	int ret = post->failed ? -1 : 0;

	i_assert(conn->posting);

	solr_connection_post_more(post, NULL, 0);

	curl_easy_getinfo(conn->curl, CURLINFO_RESPONSE_CODE, &httpret);
	if (httpret != 200 && ret == 0) {
		i_error("fts_solr: Indexing failed: %s", conn->http_failure);
		ret = -1;
	}

	curl_easy_setopt(conn->curl, CURLOPT_READDATA, NULL);
	curl_easy_setopt(conn->curl, CURLOPT_POST, (long)0);
	curl_easy_setopt(conn->curl, CURLOPT_HTTPHEADER, conn->headers);

	(void)curl_multi_remove_handle(conn->curlm, conn->curl);
	i_free(post);

	conn->posting = FALSE;
	return ret;
}

int solr_connection_post(struct solr_connection *conn, const char *cmd)
{
	struct solr_connection_post *post;

	post = solr_connection_post_begin(conn);
	solr_connection_post_more(post, (const unsigned char *)cmd,
				  strlen(cmd));
	return solr_connection_post_end(post);
}