view src/stats/client.c @ 22715:20415dd0b85a

dsync: Add per-mailbox sync lock that is always used. Both importing and exporting gets the lock before they even sync the mailbox. The lock is kept until the import/export finishes. This guarantees that no matter how dsync is run, two dsyncs can't be working on the same mailbox at the same time. This lock is in addition to the optional per-user lock enabled by the -l parameter. If the -l parameter is used, the same lock timeout is used for the per-mailbox lock. Otherwise 30s timeout is used. This should help to avoid email duplication when replication is enabled for public namespaces, and maybe in some other rare situations as well.
author Timo Sirainen <timo.sirainen@dovecot.fi>
date Thu, 28 Dec 2017 14:10:23 +0200
parents cb108f786fb4
children
line wrap: on
line source

/* Copyright (c) 2011-2018 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "llist.h"
#include "ioloop.h"
#include "istream.h"
#include "ostream.h"
#include "strescape.h"
#include "master-service.h"
#include "mail-command.h"
#include "mail-session.h"
#include "mail-user.h"
#include "mail-domain.h"
#include "mail-ip.h"
#include "client-export.h"
#include "client-reset.h"
#include "client.h"

#include <unistd.h>

#define CLIENT_MAX_SIMULTANEOUS_ITER_COUNT 1000
#define MAX_INBUF_SIZE 1024
#define OUTBUF_THROTTLE_SIZE (1024*64)

static struct client *clients;

bool client_is_busy(struct client *client)
{
	client->iter_count++;
	if (client->iter_count % CLIENT_MAX_SIMULTANEOUS_ITER_COUNT == 0)
		return TRUE;
	if (o_stream_get_buffer_used_size(client->output) < OUTBUF_THROTTLE_SIZE)
		return FALSE;
	if (o_stream_flush(client->output) < 0)
		return TRUE;
	return o_stream_get_buffer_used_size(client->output) >= OUTBUF_THROTTLE_SIZE;
}

static int
client_handle_request(struct client *client, const char *const *args,
		      const char **error_r)
{
	const char *cmd = args[0];

	if (cmd == NULL) {
		*error_r = "Missing command";
		return -1;
	}
	args++;

	if (strcmp(cmd, "EXPORT") == 0)
		return client_export(client, args, error_r);
	if (strcmp(cmd, "RESET") == 0)
		return client_stats_reset(client, args, error_r);

	*error_r = "Unknown command";
	return -1;
}

static const char *const*
client_read_next_line(struct client *client)
{
	const char *line;
	char **args;
	unsigned int i;

	line = i_stream_next_line(client->input);
	if (line == NULL)
		return NULL;

	args = p_strsplit(pool_datastack_create(), line, "\t");
	for (i = 0; args[i] != NULL; i++)
		args[i] = str_tabunescape(args[i]);
	return (void *)args;
}

static void client_input(struct client *client)
{
	const char *const *args, *error;
	int ret;

	if (client->to_pending != NULL)
		timeout_remove(&client->to_pending);

	switch (i_stream_read(client->input)) {
	case -2:
		i_error("BUG: Stats client sent too much data");
		client_destroy(&client);
		return;
	case -1:
		client_destroy(&client);
		return;
	}

	o_stream_cork(client->output);
	while ((args = client_read_next_line(client)) != NULL) {
		ret = client_handle_request(client, args, &error);
		if (ret < 0) {
			i_error("Stats client input error: %s", error);
			client_destroy(&client);
			return;
		}
		if (ret == 0) {
			o_stream_set_flush_pending(client->output, TRUE);
			io_remove(&client->io);
			break;
		}
		client->cmd_more = NULL;
	}
	o_stream_uncork(client->output);
}

static int client_output(struct client *client)
{
	int ret = 1;

	o_stream_cork(client->output);
	if (o_stream_flush(client->output) < 0) {
		client_destroy(&client);
		return 1;
	}
	if (client->cmd_more != NULL)
		ret = client->cmd_more(client);
	o_stream_uncork(client->output);

	if (ret > 0) {
		client->cmd_more = NULL;
		if (client->io == NULL)
			client_enable_io(client);
	}
	return ret;
}

void client_enable_io(struct client *client)
{
	i_assert(client->io == NULL);

	client->io = io_add(client->fd, IO_READ, client_input, client);
	if (client->to_pending == NULL)
		client->to_pending = timeout_add(0, client_input, client);
}

struct client *client_create(int fd)
{
	struct client *client;

	client = i_new(struct client, 1);
	client->fd = fd;
	client->io = io_add(fd, IO_READ, client_input, client);
	client->input = i_stream_create_fd(fd, MAX_INBUF_SIZE, FALSE);
	client->output = o_stream_create_fd(fd, (size_t)-1, FALSE);
	o_stream_set_no_error_handling(client->output, TRUE);
	o_stream_set_flush_callback(client->output, client_output, client);
	client->cmd_pool = pool_alloconly_create("cmd pool", 1024);

	DLLIST_PREPEND(&clients, client);
	return client;
}

static void client_unref_iters(struct client *client)
{
	if (client->mail_cmd_iter != NULL)
		mail_command_unref(&client->mail_cmd_iter);
	if (client->mail_session_iter != NULL)
		mail_session_unref(&client->mail_session_iter);
	if (client->mail_user_iter != NULL)
		mail_user_unref(&client->mail_user_iter);
	if (client->mail_domain_iter != NULL)
		mail_domain_unref(&client->mail_domain_iter);
	if (client->mail_ip_iter != NULL)
		mail_ip_unref(&client->mail_ip_iter);
}

void client_destroy(struct client **_client)
{
	struct client *client = *_client;

	*_client = NULL;

	DLLIST_REMOVE(&clients, client);
	if (client->io != NULL)
		io_remove(&client->io);
	i_stream_destroy(&client->input);
	o_stream_destroy(&client->output);
	if (close(client->fd) < 0)
		i_error("close(client) failed: %m");

	client_unref_iters(client);
	pool_unref(&client->cmd_pool);
	i_free(client);

	master_service_client_connection_destroyed(master_service);
}

void clients_destroy_all(void)
{
	while (clients != NULL) {
		struct client *client = clients;

		client_destroy(&client);
	}
}