view src/lib-master/anvil-client.c @ 22652:09523ad05bef

director: Log whenever HOST-RESET-USERS is used
author Timo Sirainen <timo.sirainen@dovecot.fi>
date Sun, 05 Nov 2017 22:53:23 +0200
parents 2e2563132d5f
children cb108f786fb4
line wrap: on
line source

/* Copyright (c) 2009-2017 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "ioloop.h"
#include "net.h"
#include "istream.h"
#include "ostream.h"
#include "array.h"
#include "aqueue.h"
#include "anvil-client.h"

struct anvil_query {
	anvil_callback_t *callback;
	void *context;
};

struct anvil_client {
	char *path;
	int fd;
	struct istream *input;
	struct ostream *output;
	struct io *io;
	struct timeout *to_query;

	struct timeout *to_reconnect;
	time_t last_reconnect;

	ARRAY(struct anvil_query *) queries_arr;
	struct aqueue *queries;

	bool (*reconnect_callback)(void);
	enum anvil_client_flags flags;
};

#define ANVIL_HANDSHAKE "VERSION\tanvil\t1\t0\n"
#define ANVIL_INBUF_SIZE 1024
#define ANVIL_RECONNECT_MIN_SECS 5
#define ANVIL_QUERY_TIMEOUT_MSECS (1000*5)

static void anvil_client_disconnect(struct anvil_client *client);

struct anvil_client *
anvil_client_init(const char *path, bool (*reconnect_callback)(void),
		  enum anvil_client_flags flags)
{
	struct anvil_client *client;

	client = i_new(struct anvil_client, 1);
	client->path = i_strdup(path);
	client->reconnect_callback = reconnect_callback;
	client->flags = flags;
	client->fd = -1;
	i_array_init(&client->queries_arr, 32);
	client->queries = aqueue_init(&client->queries_arr.arr);
	return client;
}

void anvil_client_deinit(struct anvil_client **_client)
{
	struct anvil_client *client = *_client;

	*_client = NULL;

	anvil_client_disconnect(client);
	array_free(&client->queries_arr);
	aqueue_deinit(&client->queries);
	i_free(client->path);
	i_assert(client->to_reconnect == NULL);
	i_free(client);
}

static void anvil_reconnect(struct anvil_client *client)
{
	anvil_client_disconnect(client);
	if (client->reconnect_callback != NULL) {
		if (!client->reconnect_callback()) {
			/* no reconnection */
			return;
		}
	}

	if (ioloop_time - client->last_reconnect < ANVIL_RECONNECT_MIN_SECS) {
		if (client->to_reconnect == NULL) {
			client->to_reconnect =
				timeout_add(ANVIL_RECONNECT_MIN_SECS*1000,
					    anvil_reconnect, client);
		}
	} else {
		client->last_reconnect = ioloop_time;
		(void)anvil_client_connect(client, FALSE);
	}
}

static void anvil_input(struct anvil_client *client)
{
	struct anvil_query *const *queries;
	struct anvil_query *query;
	const char *line;
	unsigned int count;

	queries = array_get(&client->queries_arr, &count);
	while ((line = i_stream_read_next_line(client->input)) != NULL) {
		if (aqueue_count(client->queries) == 0) {
			i_error("anvil: Unexpected input: %s", line);
			continue;
		}

		query = queries[aqueue_idx(client->queries, 0)];
		if (query->callback != NULL) T_BEGIN {
			query->callback(line, query->context);
		} T_END;
		i_free(query);
		aqueue_delete_tail(client->queries);
	}
	if (client->input->stream_errno != 0) {
		i_error("read(%s) failed: %s", client->path,
			i_stream_get_error(client->input));
		anvil_reconnect(client);
	} else if (client->input->eof) {
		i_error("read(%s) failed: EOF", client->path);
		anvil_reconnect(client);
	} else if (client->to_query != NULL) {
		if (aqueue_count(client->queries) == 0)
			timeout_remove(&client->to_query);
		else
			timeout_reset(client->to_query);
	}
}

int anvil_client_connect(struct anvil_client *client, bool retry)
{
	int fd;

	i_assert(client->fd == -1);

	fd = retry ? net_connect_unix_with_retries(client->path, 5000) :
		net_connect_unix(client->path);
	if (fd == -1) {
		if (errno != ENOENT ||
		    (client->flags & ANVIL_CLIENT_FLAG_HIDE_ENOENT) == 0) {
			i_error("net_connect_unix(%s) failed: %m",
				client->path);
		}
		return -1;
	}

	if (client->to_reconnect != NULL)
		timeout_remove(&client->to_reconnect);

	client->fd = fd;
	client->input = i_stream_create_fd(fd, ANVIL_INBUF_SIZE, FALSE);
	client->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
	client->io = io_add(fd, IO_READ, anvil_input, client);
	if (o_stream_send_str(client->output, ANVIL_HANDSHAKE) < 0) {
		i_error("write(%s) failed: %s", client->path,
			o_stream_get_error(client->output));
		anvil_reconnect(client);
		return -1;
	}
	return 0;
}

static void anvil_client_cancel_queries(struct anvil_client *client)
{
	struct anvil_query *const *queries, *query;
	unsigned int count;

	queries = array_get(&client->queries_arr, &count);
	while (aqueue_count(client->queries) > 0) {
		query = queries[aqueue_idx(client->queries, 0)];
		if (query->callback != NULL)
			query->callback(NULL, query->context);
		i_free(query);
		aqueue_delete_tail(client->queries);
	}
	if (client->to_query != NULL)
		timeout_remove(&client->to_query);
}

static void anvil_client_disconnect(struct anvil_client *client)
{
	anvil_client_cancel_queries(client);
	if (client->fd != -1) {
		io_remove(&client->io);
		i_stream_destroy(&client->input);
		o_stream_destroy(&client->output);
		net_disconnect(client->fd);
		client->fd = -1;
	}
	if (client->to_reconnect != NULL)
		timeout_remove(&client->to_reconnect);
}

static void anvil_client_timeout(struct anvil_client *client)
{
	i_assert(aqueue_count(client->queries) > 0);

	i_error("%s: Anvil queries timed out after %u secs - aborting queries",
		client->path, ANVIL_QUERY_TIMEOUT_MSECS/1000);
	/* perhaps reconnect helps */
	anvil_reconnect(client);
}

static int anvil_client_send(struct anvil_client *client, const char *cmd)
{
	struct const_iovec iov[2];

	if (client->fd == -1) {
		if (anvil_client_connect(client, FALSE) < 0)
			return -1;
	}

	iov[0].iov_base = cmd;
	iov[0].iov_len = strlen(cmd);
	iov[1].iov_base = "\n";
	iov[1].iov_len = 1;
	if (o_stream_sendv(client->output, iov, 2) < 0) {
		i_error("write(%s) failed: %s", client->path,
			o_stream_get_error(client->output));
		anvil_reconnect(client);
		return -1;
	}
	return 0;
}

struct anvil_query *
anvil_client_query(struct anvil_client *client, const char *query,
		   anvil_callback_t *callback, void *context)
{
	struct anvil_query *anvil_query;

	if (anvil_client_send(client, query) < 0) {
		callback(NULL, context);
		return NULL;
	}

	anvil_query = i_new(struct anvil_query, 1);
	anvil_query->callback = callback;
	anvil_query->context = context;
	aqueue_append(client->queries, &anvil_query);
	if (client->to_query == NULL) {
		client->to_query = timeout_add(ANVIL_QUERY_TIMEOUT_MSECS,
					       anvil_client_timeout, client);
	}
	return anvil_query;
}

void anvil_client_query_abort(struct anvil_client *client,
			      struct anvil_query **_query)
{
	struct anvil_query *query = *_query;
	struct anvil_query *const *queries;
	unsigned int i, count;

	*_query = NULL;

	count = aqueue_count(client->queries);
	queries = array_idx(&client->queries_arr, 0);
	for (i = 0; i < count; i++) {
		if (queries[aqueue_idx(client->queries, i)] == query) {
			query->callback = NULL;
			return;
		}
	}
	i_panic("anvil query to be aborted doesn't exist");
}

void anvil_client_cmd(struct anvil_client *client, const char *cmd)
{
	(void)anvil_client_send(client, cmd);
}

bool anvil_client_is_connected(struct anvil_client *client)
{
	return client->fd != -1;
}