Mercurial > dovecot > core-2.2
view src/replication/replicator/replicator-brain.c @ 15714:90710c6c3beb
Updated copyright notices to include year 2013.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Sat, 02 Feb 2013 17:01:07 +0200 |
parents | 1bd70d505ef7 |
children | 36ef72481934 |
line wrap: on
line source
* Copyright (c) 2013 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "array.h" #include "ioloop.h" #include "doveadm-connection.h" #include "replicator-settings.h" #include "replicator-queue.h" #include "replicator-brain.h" struct replicator_sync_context { struct replicator_brain *brain; struct replicator_user *user; }; struct replicator_brain { pool_t pool; struct replicator_queue *queue; const struct replicator_settings *set; struct timeout *to; ARRAY(struct doveadm_connection *) doveadm_conns; unsigned int deinitializing:1; }; static void replicator_brain_fill(struct replicator_brain *brain); static void replicator_brain_queue_changed(void *context) { struct replicator_brain *brain = context; replicator_brain_fill(brain); } struct replicator_brain * replicator_brain_init(struct replicator_queue *queue, const struct replicator_settings *set) { struct replicator_brain *brain; pool_t pool; pool = pool_alloconly_create("replication brain", 1024); brain = p_new(pool, struct replicator_brain, 1); brain->pool = pool; brain->queue = queue; brain->set = set; p_array_init(&brain->doveadm_conns, pool, 16); replicator_queue_set_change_callback(queue, replicator_brain_queue_changed, brain); replicator_brain_fill(brain); return brain; } void replicator_brain_deinit(struct replicator_brain **_brain) { struct replicator_brain *brain = *_brain; struct doveadm_connection **connp; *_brain = NULL; brain->deinitializing = TRUE; array_foreach_modifiable(&brain->doveadm_conns, connp) doveadm_connection_deinit(connp); if (brain->to != NULL) timeout_remove(&brain->to); pool_unref(&brain->pool); } static struct doveadm_connection * get_doveadm_connection(struct replicator_brain *brain) { struct doveadm_connection *const *connp, *conn = NULL; array_foreach(&brain->doveadm_conns, connp) { if (!doveadm_connection_is_busy(*connp)) return *connp; } if (array_count(&brain->doveadm_conns) == brain->set->replication_max_conns) return NULL; conn = doveadm_connection_init(brain->set->doveadm_socket_path); array_append(&brain->doveadm_conns, &conn, 1); return conn; } static void doveadm_sync_callback(enum doveadm_reply reply, const char *state, void *context) { struct replicator_sync_context *ctx = context; if (reply == DOVEADM_REPLY_NOUSER) { /* user no longer exists, remove from replication */ replicator_queue_remove(ctx->brain->queue, &ctx->user); } else { i_free(ctx->user->state); ctx->user->state = i_strdup_empty(state); ctx->user->last_sync_failed = reply != DOVEADM_REPLY_OK; replicator_queue_push(ctx->brain->queue, ctx->user); } if (!ctx->brain->deinitializing) replicator_brain_fill(ctx->brain); i_free(ctx); } static bool doveadm_replicate(struct replicator_brain *brain, struct replicator_user *user) { struct replicator_sync_context *ctx; struct doveadm_connection *conn; time_t next_full_sync; bool full; conn = get_doveadm_connection(brain); if (conn == NULL) return FALSE; next_full_sync = user->last_full_sync + brain->set->replication_full_sync_interval; full = next_full_sync <= ioloop_time; /* update the sync times immediately. if the replication fails we still wouldn't want it to be retried immediately. */ user->last_fast_sync = ioloop_time; if (full) user->last_full_sync = ioloop_time; /* reset priority also. if more updates arrive during replication we'll do another replication to make sure nothing gets lost */ user->priority = REPLICATION_PRIORITY_NONE; ctx = i_new(struct replicator_sync_context, 1); ctx->brain = brain; ctx->user = user; doveadm_connection_sync(conn, user->username, user->state, full, doveadm_sync_callback, ctx); return TRUE; } static void replicator_brain_timeout(struct replicator_brain *brain) { timeout_remove(&brain->to); replicator_brain_fill(brain); } static bool replicator_brain_fill_next(struct replicator_brain *brain) { struct replicator_user *user; unsigned int next_secs; user = replicator_queue_pop(brain->queue, &next_secs); if (user == NULL) { /* nothing more to do */ if (brain->to != NULL) timeout_remove(&brain->to); brain->to = timeout_add(next_secs * 1000, replicator_brain_timeout, brain); return FALSE; } if (!doveadm_replicate(brain, user)) { /* all connections were full, put the user back to queue */ replicator_queue_push(brain->queue, user); return FALSE; } /* replication started for the user */ return TRUE; } static void replicator_brain_fill(struct replicator_brain *brain) { while (replicator_brain_fill_next(brain)) ; }