view src/indexer/worker-connection.c @ 22325:e01bc3015b2f

lib-index: Check .log.2 rotation only when syncing Instead of also whenever appending transactions to .log file. This shouldn't change the behavior much, and it's needed for the following change to work correctly.
author Timo Sirainen <timo.sirainen@dovecot.fi>
date Tue, 11 Jul 2017 15:33:56 +0300
parents 2e2563132d5f
children cb108f786fb4
line wrap: on
line source

/* Copyright (c) 2011-2017 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "array.h"
#include "aqueue.h"
#include "ioloop.h"
#include "istream.h"
#include "ostream.h"
#include "str.h"
#include "strescape.h"
#include "master-service.h"
#include "indexer-queue.h"
#include "worker-connection.h"

#include <unistd.h>

#define INDEXER_PROTOCOL_MAJOR_VERSION 1
#define INDEXER_PROTOCOL_MINOR_VERSION 0

#define INDEXER_MASTER_HANDSHAKE "VERSION\tindexer-master-worker\t1\t0\n"
#define INDEXER_WORKER_NAME "indexer-worker-master"

struct worker_connection {
	int refcount;

	char *socket_path;
	indexer_status_callback_t *callback;

	int fd;
	struct io *io;
	struct istream *input;
	struct ostream *output;

	char *request_username;
	ARRAY(void *) request_contexts;
	struct aqueue *request_queue;

	unsigned int process_limit;
	unsigned int version_received:1;
};

struct worker_connection *
worker_connection_create(const char *socket_path,
			 indexer_status_callback_t *callback)
{
	struct worker_connection *conn;

	conn = i_new(struct worker_connection, 1);
	conn->refcount = 1;
	conn->socket_path = i_strdup(socket_path);
	conn->callback = callback;
	conn->fd = -1;
	i_array_init(&conn->request_contexts, 32);
	conn->request_queue = aqueue_init(&conn->request_contexts.arr);
	return conn;
}

static void worker_connection_unref(struct worker_connection *conn)
{
	i_assert(conn->refcount > 0);
	if (--conn->refcount > 0)
		return;

	aqueue_deinit(&conn->request_queue);
	array_free(&conn->request_contexts);
	i_free(conn->socket_path);
	i_free(conn);
}

static void worker_connection_disconnect(struct worker_connection *conn)
{
	unsigned int i, count = aqueue_count(conn->request_queue);

	if (conn->fd != -1) {
		io_remove(&conn->io);
		i_stream_destroy(&conn->input);
		o_stream_destroy(&conn->output);

		if (close(conn->fd) < 0)
			i_error("close(%s) failed: %m", conn->socket_path);
		conn->fd = -1;
	}

	/* cancel any pending requests */
	if (count > 0) {
		i_error("Indexer worker disconnected, "
			"discarding %u requests for %s",
			count, conn->request_username);
	}

	/* conn->callback() can try to destroy us */
	conn->refcount++;
	for (i = 0; i < count; i++) {
		void *const *contextp =
			array_idx(&conn->request_contexts,
				  aqueue_idx(conn->request_queue, 0));
		void *context = *contextp;

		aqueue_delete_tail(conn->request_queue);
		conn->callback(-1, context);
	}
	i_free_and_null(conn->request_username);
	worker_connection_unref(conn);
}

void worker_connection_destroy(struct worker_connection **_conn)
{
	struct worker_connection *conn = *_conn;

	*_conn = NULL;

	worker_connection_disconnect(conn);
	worker_connection_unref(conn);
}

static int
worker_connection_input_line(struct worker_connection *conn, const char *line)
{
	void *const *contextp, *context;
	int percentage;

	if (aqueue_count(conn->request_queue) == 0) {
		i_error("Input from worker without pending requests: %s", line);
		return -1;
	}

	if (str_to_int(line, &percentage) < 0 ||
	    percentage < -1 || percentage > 100) {
		i_error("Invalid input from worker: %s", line);
		return -1;
	}

	contextp = array_idx(&conn->request_contexts,
			     aqueue_idx(conn->request_queue, 0));
	context = *contextp;
	if (percentage < 0 || percentage == 100) {
		/* the request is finished */
		aqueue_delete_tail(conn->request_queue);
		if (aqueue_count(conn->request_queue) == 0)
			i_free_and_null(conn->request_username);
	}

	conn->callback(percentage, context);
	return 0;
}

static void worker_connection_input(struct worker_connection *conn)
{
	const char *line;

	if (i_stream_read(conn->input) < 0) {
		worker_connection_disconnect(conn);
		return;
	}

	if (!conn->version_received) {
		if ((line = i_stream_next_line(conn->input)) == NULL)
			return;

		if (!version_string_verify(line, INDEXER_WORKER_NAME,
				INDEXER_PROTOCOL_MAJOR_VERSION)) {
			i_error("Indexer worker not compatible with this master "
				"(mixed old and new binaries?)");
			worker_connection_disconnect(conn);
			return;
		}
		conn->version_received = TRUE;
	}
	if (conn->process_limit == 0) {
		if ((line = i_stream_next_line(conn->input)) == NULL)
			return;
		if (str_to_uint(line, &conn->process_limit) < 0 ||
		    conn->process_limit == 0) {
			i_error("Indexer worker sent invalid handshake: %s",
				line);
			worker_connection_disconnect(conn);
			return;
		}
	}

	while ((line = i_stream_next_line(conn->input)) != NULL) {
		if (worker_connection_input_line(conn, line) < 0) {
			worker_connection_disconnect(conn);
			break;
		}
	}
}

int worker_connection_connect(struct worker_connection *conn)
{
	i_assert(conn->fd == -1);

	conn->fd = net_connect_unix(conn->socket_path);
	if (conn->fd == -1) {
		i_error("connect(%s) failed: %m", conn->socket_path);
		return -1;
	}
	conn->io = io_add(conn->fd, IO_READ, worker_connection_input, conn);
	conn->input = i_stream_create_fd(conn->fd, (size_t)-1, FALSE);
	conn->output = o_stream_create_fd(conn->fd, (size_t)-1, FALSE);
	o_stream_set_no_error_handling(conn->output, TRUE);
	o_stream_nsend_str(conn->output, INDEXER_MASTER_HANDSHAKE);
	return 0;
}

bool worker_connection_is_connected(struct worker_connection *conn)
{
	return conn->fd != -1;
}

bool worker_connection_get_process_limit(struct worker_connection *conn,
					 unsigned int *limit_r)
{
	if (conn->process_limit == 0)
		return FALSE;

	*limit_r = conn->process_limit;
	return TRUE;
}

void worker_connection_request(struct worker_connection *conn,
			       const struct indexer_request *request,
			       void *context)
{
	i_assert(worker_connection_is_connected(conn));
	i_assert(context != NULL);
	i_assert(request->index || request->optimize);

	if (conn->request_username == NULL)
		conn->request_username = i_strdup(request->username);
	else {
		i_assert(strcmp(conn->request_username,
				request->username) == 0);
	}

	aqueue_append(conn->request_queue, &context);

	T_BEGIN {
		string_t *str = t_str_new(128);

		str_append_tabescaped(str, request->username);
		str_append_c(str, '\t');
		str_append_tabescaped(str, request->mailbox);
		str_append_c(str, '\t');
		if (request->session_id != NULL)
			str_append_tabescaped(str, request->session_id);
		str_printfa(str, "\t%u\t", request->max_recent_msgs);
		if (request->index)
			str_append_c(str, 'i');
		if (request->optimize)
			str_append_c(str, 'o');
		str_append_c(str, '\n');
		o_stream_nsend(conn->output, str_data(str), str_len(str));
	} T_END;
}

bool worker_connection_is_busy(struct worker_connection *conn)
{
	return aqueue_count(conn->request_queue) > 0;
}

const char *worker_connection_get_username(struct worker_connection *conn)
{
	return conn->request_username;
}