view src/plugins/fts-lucene/lucene-wrapper.cc @ 9575:0a00dcc4f0ea HEAD

lib-storage: Allow shared namespace prefix to use %variable modifiers.
author Timo Sirainen <tss@iki.fi>
date Wed, 26 May 2010 17:07:51 +0100
parents 00cd9aacd03c
children
line wrap: on
line source

/* Copyright (c) 2006-2010 Dovecot authors, see the included COPYING file */

extern "C" {
#include "lib.h"
#include "array.h"
#include "env-util.h"
#include "unichar.h"
#include "str.h"
#include "str-sanitize.h"
#include "lucene-wrapper.h"

#include <dirent.h>
#include <sys/stat.h>
};
#include <CLucene.h>

/* Lucene's default is 10000. Use it here also.. */
#define MAX_TERMS_PER_DOCUMENT 10000

/* If all the files in the lucene index directory are older than this many
   seconds, assume we can delete stale locks */
#define STALE_INDEX_SECS 60
/* When index is determined to be stale, delete all locks older than this */
#define STALE_LOCK_SECS 60
/* Minimum interval between staleness checks */
#define STALENESS_CHECK_INTERVAL 10

using namespace lucene::document;
using namespace lucene::index;
using namespace lucene::search;
using namespace lucene::queryParser;
using namespace lucene::analysis;

struct lucene_index {
	char *path, *lock_path;
	char *mailbox_name;
	TCHAR *tmailbox_name;

	time_t last_stale_check;
	bool lock_error;

	IndexReader *reader;
	IndexWriter *writer;
	IndexSearcher *searcher;
	Analyzer *analyzer;

	Document *doc;
	uint32_t prev_uid, last_uid;
};

class RawTokenStream : public TokenStream {
	CL_NS(util)::Reader *reader;

public:
	RawTokenStream(CL_NS(util)::Reader *reader) {
		this->reader = reader;
	};

	bool next(Token *token) {
		const TCHAR *data;

		int32_t len = this->reader->read(data);
		if (len <= 0)
			return false;

		token->set(data, 0, len);
		return true;
	}

	void close() { }
};

class DovecotAnalyzer : public standard::StandardAnalyzer {
public:
	TokenStream *tokenStream(const TCHAR *fieldName,
				 CL_NS(util)::Reader *reader) {
		/* Everything except body/headers should go as-is without any
		   modifications. Isn't there any easier way to do this than
		   to implement a whole new RawTokenStream?.. */
		if (fieldName != 0 &&
		    wcscmp(fieldName, L"headers") != 0 &&
		    wcscmp(fieldName, L"body") != 0)
			return _CLNEW RawTokenStream(reader);

		return standard::StandardAnalyzer::
			tokenStream(fieldName, reader);
	}
};

static bool lucene_dir_scan(const char *dir, const char *skip_path,
			    time_t stale_stamp, bool unlink_staled)
{
	DIR *d;
	struct dirent *dp;
	struct stat st;
	string_t *path;
	unsigned int dir_len;
	bool found_nonstale = FALSE;

	d = opendir(dir);
	if (d == NULL) {
		i_error("opendir(%s) failed: %m", dir);
		return TRUE;
	}

	t_push();
	path = t_str_new(256);
	str_append(path, dir);
	str_append_c(path, '/');
	dir_len = str_len(path);

	while ((dp = readdir(d)) != NULL) {
		if (*dp->d_name == '.') {
			if (dp->d_name[1] == '\0')
				continue;
			if (dp->d_name[1] == '.' && dp->d_name[2] == '\0')
				continue;
		}

		str_truncate(path, dir_len);
		str_append(path, dp->d_name);

		if (skip_path != NULL &&
		    strcmp(str_c(path), skip_path) == 0)
			continue;

		if (stat(str_c(path), &st) < 0) {
			if (errno != ENOENT)
				i_error("stat(%s) failed: %m", str_c(path));
			found_nonstale = TRUE;
		} else if (st.st_ctime <= stale_stamp &&
			   st.st_mtime <= stale_stamp) {
			if (unlink_staled) {
				if (unlink(str_c(path)) < 0 &&
				    errno != ENOENT) {
					i_error("unlink(%s) failed: %m",
						str_c(path));
				}
			}
		} else {
			found_nonstale = TRUE;
		}
	}
	if (closedir(d) < 0)
		i_error("closedir(%s) failed: %m", dir);
	t_pop();
	return found_nonstale;
}

static void lucene_delete_stale_locks(struct lucene_index *index)
{
	time_t now;

	now = time(NULL);
	if (index->last_stale_check + STALENESS_CHECK_INTERVAL > now)
		return;
	index->last_stale_check = now;

	if (lucene_dir_scan(index->path, index->lock_path,
			    now - STALE_INDEX_SECS, FALSE)) {
		/* the index is probably being updated */
		return;
	}
	(void)lucene_dir_scan(index->lock_path, NULL,
			      now - STALE_LOCK_SECS, TRUE);
}

struct lucene_index *lucene_index_init(const char *path, const char *lock_path)
{
	struct lucene_index *index;

	env_put(t_strconcat(LUCENE_LOCK_DIR_ENV_1"=", lock_path, NULL));

	index = i_new(struct lucene_index, 1);
	index->path = i_strdup(path);
	index->lock_path = i_strdup(lock_path);
	index->analyzer = _CLNEW DovecotAnalyzer();

	lucene_delete_stale_locks(index);
	return index;
}

static void lucene_index_close(struct lucene_index *index)
{
	_CLDELETE(index->reader);
	_CLDELETE(index->writer);
	_CLDELETE(index->searcher);
}

void lucene_index_deinit(struct lucene_index *index)
{
	lucene_index_close(index);
	_CLDELETE(index->analyzer);
	i_free(index->mailbox_name);
	i_free(index->tmailbox_name);
	i_free(index->path);
	i_free(index->lock_path);
	i_free(index);
}

void lucene_index_select_mailbox(struct lucene_index *index,
				 const char *mailbox_name)
{
	size_t len;

	i_free(index->mailbox_name);
	i_free(index->tmailbox_name);

	len = strlen(mailbox_name);
	index->mailbox_name = i_strdup(mailbox_name);
	index->tmailbox_name = i_new(TCHAR, len + 1);
	STRCPY_AtoT(index->tmailbox_name, mailbox_name, len);
}

static void lucene_handle_error(struct lucene_index *index, CLuceneError &err,
				const char *msg)
{
	const char *what = err.what();

	if (err.number() == CL_ERR_IO && strncasecmp(what, "Lock", 4) == 0) {
		/* "Lock obtain timed out". delete any stale locks. */
		lucene_delete_stale_locks(index);
		if (index->lock_error) {
			/* we've already complained about this */
			return;
		}
		index->lock_error = TRUE;
	}
	i_error("lucene index %s: %s failed: %s", index->path, msg, what);
}

static int lucene_index_open(struct lucene_index *index)
{
	if (index->reader != NULL)
		return 1;

	if (!IndexReader::indexExists(index->path))
		return 0;

	try {
		index->reader = IndexReader::open(index->path);
	} catch (CLuceneError &err) {
		lucene_handle_error(index, err, "IndexReader::open()");
		return -1;
	}
	return 1;
}

static int lucene_index_open_search(struct lucene_index *index)
{
	int ret;

	if (index->searcher != NULL)
		return 1;

	if ((ret = lucene_index_open(index)) <= 0)
		return ret;

	index->searcher = _CLNEW IndexSearcher(index->reader);
	return 1;
}

static int
lucene_doc_get_uid(struct lucene_index *index, Document *doc,
		   const TCHAR *field_name, uint32_t *uid_r)
{
	Field *field = doc->getField(field_name);
	TCHAR *uid = field == NULL ? NULL : field->stringValue();
	if (uid == NULL) {
		i_error("lucene: Corrupted FTS index %s: No UID for document",
			index->path);
		return -1;
	}

	uint32_t num = 0;
	while (*uid != 0) {
		num = num*10 + (*uid - '0');
		uid++;
	}
	*uid_r = num;
	return 0;
}

static int
lucene_index_get_last_uid_int(struct lucene_index *index, bool delete_old)
{
	ARRAY_TYPE(uint32_t) delete_doc_ids;
	uint32_t del_id;
	int ret = 0;
	bool deleted = false;

	index->last_uid = 0;

	if ((ret = lucene_index_open_search(index)) <= 0)
		return ret;

	/* find all the existing last_uids for selected mailbox.
	   if there are more than one, delete the smaller ones. this is normal
	   behavior because we can't update/delete documents in writer, so
	   we'll do it only in here.. */
	Term mailbox_term(_T("box"), index->tmailbox_name);
	Term last_uid_term(_T("last_uid"), _T("*"));
	TermQuery mailbox_query(&mailbox_term);
	WildcardQuery last_uid_query(&last_uid_term);

	BooleanQuery query;
	query.add(&mailbox_query, true, false);
	query.add(&last_uid_query, true, false);

	t_push();
	t_array_init(&delete_doc_ids, 10);
	int32_t last_doc_id = -1;
	try {
		Hits *hits = index->searcher->search(&query);

		for (int32_t i = 0; i < hits->length(); i++) {
			uint32_t uid;

			if (lucene_doc_get_uid(index, &hits->doc(i),
					       _T("last_uid"), &uid) < 0) {
				ret = -1;
				break;
			}

			if (uid > index->last_uid) {
				if (last_doc_id >= 0) {
					del_id = last_doc_id;
					array_append_i(&delete_doc_ids.arr,
						       (void *)&del_id, 1);
				}
				index->last_uid = uid;
				last_doc_id = hits->id(i);
			} else {
				del_id = hits->id(i);
				array_append_i(&delete_doc_ids.arr,
					       (void *)&del_id, 1);
			}
		}
		if (delete_old && array_count(&delete_doc_ids) > 0) {
			const uint32_t *ids;
			unsigned int i, count;

			ids = array_get(&delete_doc_ids, &count);
			for (i = 0; i < count; i++)
				index->reader->deleteDocument(ids[i]);
			deleted = true;
		}
		index->lock_error = FALSE;
		_CLDELETE(hits);
	} catch (CLuceneError &err) {
		lucene_handle_error(index, err, "last_uid search");
		ret = -1;
	}

	if (deleted) {
		/* the index was modified. we'll need to release the locks
		   before opening a writer */
		lucene_index_close(index);
	}
	t_pop();
	return ret;
}

int lucene_index_get_last_uid(struct lucene_index *index, uint32_t *last_uid_r)
{
	/* delete the old last_uids in here, since we've not write-locked
	   the index yet */
	if (lucene_index_get_last_uid_int(index, true) < 0)
		return -1;

	*last_uid_r = index->last_uid;
	return 0;
}

int lucene_index_build_init(struct lucene_index *index, uint32_t *last_uid_r)
{
	i_assert(index->mailbox_name != NULL);

	/* set this even if we fail so fts-storage won't crash */
	*last_uid_r = index->last_uid;

	lucene_index_close(index);

	bool exists = IndexReader::indexExists(index->path);
	try {
		index->writer = _CLNEW IndexWriter(index->path,
						   index->analyzer, !exists);
		index->lock_error = FALSE;
	} catch (CLuceneError &err) {
		lucene_handle_error(index, err, "IndexWriter()");
		return -1;
	}
	index->writer->setMaxFieldLength(MAX_TERMS_PER_DOCUMENT);

	if (lucene_index_get_last_uid_int(index, false) < 0)
		return -1;
	*last_uid_r = index->last_uid;
	return 0;
}

static int lucene_index_build_flush(struct lucene_index *index)
{
	int ret = 0;

	if (index->doc == NULL)
		return 0;

	try {
		index->writer->addDocument(index->doc);
	} catch (CLuceneError &err) {
		lucene_handle_error(index, err, "IndexWriter::addDocument()");
		ret = -1;
	}

	_CLDELETE(index->doc);
	index->doc = NULL;
	return ret;
}

int lucene_index_build_more(struct lucene_index *index, uint32_t uid,
			    const unsigned char *data, size_t size,
			    bool headers)
{
	unsigned int len;

	i_assert(uid > index->last_uid);
	i_assert(size > 0);

	len = uni_utf8_strlen_n(data, size);
	wchar_t dest[len+1];
	lucene_utf8towcs(dest, (const char *)data, len);
	dest[len] = 0;

	if (uid != index->prev_uid) {
		char id[MAX_INT_STRLEN];
		TCHAR tid[MAX_INT_STRLEN];

		if (lucene_index_build_flush(index) < 0)
			return -1;
		index->prev_uid = uid;

		index->doc = _CLNEW Document();
		i_snprintf(id, sizeof(id), "%u", uid);
		STRCPY_AtoT(tid, id, MAX_INT_STRLEN);
		index->doc->add(*Field::Text(_T("uid"), tid));
		index->doc->add(*Field::Text(_T("box"), index->tmailbox_name));
	}

	if (headers)
		index->doc->add(*Field::Text(_T("headers"), dest));
	else
		index->doc->add(*Field::Text(_T("body"), dest));
	return 0;
}

static int lucene_index_update_last_uid(struct lucene_index *index)
{
	Document doc;
	char id[MAX_INT_STRLEN];
	TCHAR tid[MAX_INT_STRLEN];

	i_snprintf(id, sizeof(id), "%u", index->last_uid);
	STRCPY_AtoT(tid, id, MAX_INT_STRLEN);

	doc.add(*Field::Text(_T("last_uid"), tid));
	doc.add(*Field::Text(_T("box"), index->tmailbox_name));

	try {
		index->writer->addDocument(&doc);
		return 0;
	} catch (CLuceneError &err) {
		lucene_handle_error(index, err, "IndexWriter::addDocument()");
		return -1;
	}
}

int lucene_index_build_deinit(struct lucene_index *index)
{
	int ret = 0;

	if (index->prev_uid == 0) {
		/* no changes. */
		return 0;
	}

	if (index->prev_uid > index->last_uid)
		index->last_uid = index->prev_uid;
	index->prev_uid = 0;

	if (index->writer == NULL) {
		lucene_index_close(index);
		return -1;
	}

	if (lucene_index_build_flush(index) < 0)
		ret = -1;
	if (lucene_index_update_last_uid(index) < 0)
		ret = -1;

	try {
		index->writer->optimize();
	} catch (CLuceneError &err) {
		lucene_handle_error(index, err, "IndexWriter::optimize()");
		ret = -1;
	}
	try {
		index->writer->close();
	} catch (CLuceneError &err) {
		lucene_handle_error(index, err, "IndexWriter::close()");
		ret = -1;
	}

	lucene_index_close(index);
	return ret;
}

int lucene_index_expunge(struct lucene_index *index, uint32_t uid)
{
	char id[MAX_INT_STRLEN];
	TCHAR tid[MAX_INT_STRLEN];
	int ret;

	if ((ret = lucene_index_open_search(index)) <= 0)
		return ret;

	i_snprintf(id, sizeof(id), "%u", uid);
	STRCPY_AtoT(tid, id, MAX_INT_STRLEN);

	Term mailbox_term(_T("box"), index->tmailbox_name);
	Term uid_term(_T("uid"), tid);
	TermQuery mailbox_query(&mailbox_term);
	TermQuery uid_query(&uid_term);

	BooleanQuery query;
	query.add(&mailbox_query, true, false);
	query.add(&uid_query, true, false);

	try {
		Hits *hits = index->searcher->search(&query);

		for (int32_t i = 0; i < hits->length(); i++)
			index->reader->deleteDocument(hits->id(i));
		index->lock_error = FALSE;
		_CLDELETE(hits);
	} catch (CLuceneError &err) {
		lucene_handle_error(index, err, "expunge search");
		ret = -1;
	}

	try {
		index->reader->close();
	} catch (CLuceneError &err) {
		lucene_handle_error(index, err, "IndexReader::close()");
		ret = -1;
	}

	lucene_index_close(index);
	return ret;
}

int lucene_index_lookup(struct lucene_index *index, enum fts_lookup_flags flags,
			const char *key, ARRAY_TYPE(seq_range) *result)
{
	const char *quoted_key;
	int ret = 0;

	i_assert((flags & (FTS_LOOKUP_FLAG_HEADER|FTS_LOOKUP_FLAG_BODY)) != 0);

	if (lucene_index_open_search(index) <= 0)
		return -1;

	t_push();
	quoted_key = strchr(key, ' ') == NULL ?
		t_strdup_printf("%s*", key) :
		t_strdup_printf("\"%s\"", key);
	unsigned int len = uni_utf8_strlen_n(quoted_key, (size_t)-1);
	wchar_t tkey[len + 1];
	lucene_utf8towcs(tkey, quoted_key, len);
	tkey[len] = 0;
	t_pop();

	BooleanQuery lookup_query;
	Query *content_query1 = NULL, *content_query2 = NULL;
	try {
		if ((flags & FTS_LOOKUP_FLAG_HEADER) != 0) {
			content_query1 = QueryParser::parse(tkey, _T("headers"),
							    index->analyzer);
			lookup_query.add(content_query1, false, false);
		}
		if ((flags & FTS_LOOKUP_FLAG_BODY) != 0) {
			content_query2 = QueryParser::parse(tkey, _T("body"),
							    index->analyzer);
			lookup_query.add(content_query2, false, false);
		}
	} catch (CLuceneError &err) {
		if (getenv("DEBUG") != NULL) {
			i_info("lucene: QueryParser::parse(%s) failed: %s",
			       str_sanitize(key, 40), err.what());
		}
		if (content_query1 != NULL)
			_CLDELETE(content_query1);
		lucene_index_close(index);
		return -1;
	}

	BooleanQuery query;
	Term mailbox_term(_T("box"), index->tmailbox_name);
	TermQuery mailbox_query(&mailbox_term);
	query.add(&lookup_query, true, false);
	query.add(&mailbox_query, true, false);

	try {
		Hits *hits = index->searcher->search(&query);

		for (int32_t i = 0; i < hits->length(); i++) {
			uint32_t uid;

			if (lucene_doc_get_uid(index, &hits->doc(i),
					       _T("uid"), &uid) < 0) {
				ret = -1;
				break;
			}

			seq_range_array_add(result, 0, uid);
		}
		index->lock_error = FALSE;
		_CLDELETE(hits);
	} catch (CLuceneError &err) {
		lucene_handle_error(index, err, "search");
		ret = -1;
	}

	if (content_query1 != NULL)
		_CLDELETE(content_query1);
	if (content_query2 != NULL)
		_CLDELETE(content_query2);
	return ret;
}