Mercurial > dovecot > core-2.2
view src/replication/replicator/replicator-queue.c @ 15714:90710c6c3beb
Updated copyright notices to include year 2013.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Sat, 02 Feb 2013 17:01:07 +0200 |
parents | b21f3119a5ee |
children | 36ef72481934 |
line wrap: on
line source
* Copyright (c) 2013 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "array.h" #include "ioloop.h" #include "istream.h" #include "ostream.h" #include "str.h" #include "strescape.h" #include "hash.h" #include "replicator-queue.h" #include <unistd.h> #include <fcntl.h> struct replicator_sync_lookup { struct replicator_user *user; replicator_sync_callback_t *callback; void *context; bool wait_for_next_push; }; struct replicator_queue { struct priorityq *user_queue; /* username => struct replicator_user* */ HASH_TABLE(char *, struct replicator_user *) user_hash; ARRAY(struct replicator_sync_lookup) sync_lookups; unsigned int full_sync_interval; void (*change_callback)(void *context); void *change_context; }; static int user_priority_cmp(const void *p1, const void *p2) { const struct replicator_user *user1 = p1, *user2 = p2; if (user1->priority > user2->priority) return -1; if (user1->priority < user2->priority) return 1; if (user1->priority != REPLICATION_PRIORITY_NONE) { /* there is something to replicate */ if (user1->last_fast_sync < user2->last_fast_sync) return -1; if (user1->last_fast_sync > user2->last_fast_sync) return 1; } else { /* nothing to replicate, but do still periodic full syncs */ if (user1->last_full_sync < user2->last_full_sync) return -1; if (user1->last_full_sync > user2->last_full_sync) return 1; } return 0; } struct replicator_queue *replicator_queue_init(unsigned int full_sync_interval) { struct replicator_queue *queue; queue = i_new(struct replicator_queue, 1); queue->full_sync_interval = full_sync_interval; queue->user_queue = priorityq_init(user_priority_cmp, 1024); hash_table_create(&queue->user_hash, default_pool, 1024, str_hash, strcmp); i_array_init(&queue->sync_lookups, 32); return queue; } void replicator_queue_deinit(struct replicator_queue **_queue) { struct replicator_queue *queue = *_queue; struct priorityq_item *item; *_queue = NULL; queue->change_callback = NULL; while ((item = priorityq_pop(queue->user_queue)) != NULL) { struct replicator_user *user = (struct replicator_user *)item; user->popped = TRUE; replicator_queue_remove(queue, &user); } priorityq_deinit(&queue->user_queue); hash_table_destroy(&queue->user_hash); i_assert(array_count(&queue->sync_lookups) == 0); array_free(&queue->sync_lookups); i_free(queue); } void replicator_queue_set_change_callback(struct replicator_queue *queue, void (*callback)(void *context), void *context) { queue->change_callback = callback; queue->change_context = context; } static struct replicator_user * replicator_queue_add_int(struct replicator_queue *queue, const char *username, enum replication_priority priority) { struct replicator_user *user; user = hash_table_lookup(queue->user_hash, username); if (user == NULL) { user = i_new(struct replicator_user, 1); user->username = i_strdup(username); hash_table_insert(queue->user_hash, user->username, user); } else { if (user->priority > priority) { /* user already has a higher priority than this */ return user; } if (!user->popped) priorityq_remove(queue->user_queue, &user->item); } user->priority = priority; user->last_update = ioloop_time; if (!user->popped) priorityq_add(queue->user_queue, &user->item); return user; } struct replicator_user * replicator_queue_add(struct replicator_queue *queue, const char *username, enum replication_priority priority) { struct replicator_user *user; user = replicator_queue_add_int(queue, username, priority); if (queue->change_callback != NULL) queue->change_callback(queue->change_context); return user; } void replicator_queue_add_sync(struct replicator_queue *queue, const char *username, replicator_sync_callback_t *callback, void *context) { struct replicator_user *user; struct replicator_sync_lookup *lookup; user = replicator_queue_add_int(queue, username, REPLICATION_PRIORITY_SYNC); lookup = array_append_space(&queue->sync_lookups); lookup->user = user; lookup->callback = callback; lookup->context = context; lookup->wait_for_next_push = user->popped; if (queue->change_callback != NULL) queue->change_callback(queue->change_context); } void replicator_queue_remove(struct replicator_queue *queue, struct replicator_user **_user) { struct replicator_user *user = *_user; *_user = NULL; if (!user->popped) priorityq_remove(queue->user_queue, &user->item); hash_table_remove(queue->user_hash, user->username); i_free(user->state); i_free(user->username); i_free(user); if (queue->change_callback != NULL) queue->change_callback(queue->change_context); } struct replicator_user * replicator_queue_pop(struct replicator_queue *queue, unsigned int *next_secs_r) { struct priorityq_item *item; struct replicator_user *user; time_t next_full_sync; item = priorityq_peek(queue->user_queue); if (item == NULL) { /* no users defined. we shouldn't normally get here */ *next_secs_r = 3600; return NULL; } user = (struct replicator_user *)item; next_full_sync = user->last_full_sync + queue->full_sync_interval; if (user->priority == REPLICATION_PRIORITY_NONE && next_full_sync > ioloop_time) { /* we don't want to do a full sync yet */ *next_secs_r = next_full_sync - ioloop_time; return NULL; } priorityq_remove(queue->user_queue, &user->item); user->popped = TRUE; return user; } static void replicator_queue_handle_sync_lookups(struct replicator_queue *queue, struct replicator_user *user) { struct replicator_sync_lookup *lookups; ARRAY(struct replicator_sync_lookup) callbacks; unsigned int i, count; bool success = !user->last_sync_failed; t_array_init(&callbacks, 8); lookups = array_get_modifiable(&queue->sync_lookups, &count); for (i = 0; i < count; ) { if (lookups[i].user != user) i++; else if (lookups[i].wait_for_next_push) { /* another sync request came while user was being replicated */ i_assert(user->priority == REPLICATION_PRIORITY_SYNC); lookups[i].wait_for_next_push = FALSE; i++; } else { array_append(&callbacks, &lookups[i], 1); array_delete(&queue->sync_lookups, i, 1); } } array_foreach_modifiable(&callbacks, lookups) lookups->callback(success, lookups->context); } void replicator_queue_push(struct replicator_queue *queue, struct replicator_user *user) { i_assert(user->popped); priorityq_add(queue->user_queue, &user->item); user->popped = FALSE; T_BEGIN { replicator_queue_handle_sync_lookups(queue, user); } T_END; } static int replicator_queue_import_line(struct replicator_queue *queue, const char *line) { const char *const *args, *username; unsigned int priority; struct replicator_user *user, tmp_user; /* <user> <priority> <last update> <last fast sync> <last full sync> */ args = t_strsplit_tabescaped(line); if (str_array_length(args) < 5) return -1; memset(&tmp_user, 0, sizeof(tmp_user)); username = args[0]; if (username[0] == '\0' || str_to_uint(args[1], &priority) < 0 || str_to_time(args[2], &tmp_user.last_update) < 0 || str_to_time(args[3], &tmp_user.last_fast_sync) < 0 || str_to_time(args[3], &tmp_user.last_full_sync) < 0) return -1; tmp_user.priority = priority; user = hash_table_lookup(queue->user_hash, username); if (user != NULL) { if (user->last_update > tmp_user.last_update) { /* we already have a newer state */ return 0; } if (user->last_update == tmp_user.last_update) { /* either one of these could be newer. use the one with higher priority. */ if (user->priority > tmp_user.priority) return 0; } } user = replicator_queue_add(queue, tmp_user.username, tmp_user.priority); user->last_update = tmp_user.last_update; user->last_fast_sync = tmp_user.last_fast_sync; user->last_full_sync = tmp_user.last_full_sync; return 0; } int replicator_queue_import(struct replicator_queue *queue, const char *path) { struct istream *input; const char *line; int fd, ret = 0; fd = open(path, O_RDONLY); if (fd == -1) { if (errno == ENOENT) return 0; i_error("open(%s) failed: %m", path); return -1; } input = i_stream_create_fd(fd, (size_t)-1, TRUE); while ((line = i_stream_read_next_line(input)) != NULL) { T_BEGIN { ret = replicator_queue_import_line(queue, line); } T_END; if (ret < 0) { i_error("Invalid replicator db record: %s", line); break; } } if (input->stream_errno != 0) ret = -1; i_stream_destroy(&input); return ret; } static void replicator_queue_export_user(struct replicator_user *user, string_t *str) { str_append_tabescaped(str, user->username); str_printfa(str, "\t%d\t%lld\t%lld\t%lld", (int)user->priority, (long long)user->last_update, (long long)user->last_fast_sync, (long long)user->last_full_sync); } int replicator_queue_export(struct replicator_queue *queue, const char *path) { struct ostream *output; struct priorityq_item *const *items; unsigned int i, count; string_t *str; int fd, ret = 0; fd = creat(path, 0600); if (fd == -1) { i_error("creat(%s) failed: %m", path); return -1; } output = o_stream_create_fd_file(fd, 0, TRUE); o_stream_cork(output); str = t_str_new(128); items = priorityq_items(queue->user_queue); count = priorityq_count(queue->user_queue); for (i = 0; i < count; i++) { struct replicator_user *user = (struct replicator_user *)items[i]; str_truncate(str, 0); replicator_queue_export_user(user, str); if (o_stream_send(output, str_data(str), str_len(str)) < 0) break; } if (o_stream_nfinish(output) < 0) { i_error("write(%s) failed: %m", path); ret = -1; } o_stream_destroy(&output); return ret; }