Mercurial > dovecot > core-2.2
view src/lib/ioloop.c @ 22664:fea53c2725c0
director: Fix director_max_parallel_moves/kicks type
Should be uint, not time.
author | Timo Sirainen <timo.sirainen@dovecot.fi> |
---|---|
date | Thu, 09 Nov 2017 12:24:16 +0200 |
parents | bcc3a15c18a6 |
children | cb108f786fb4 |
line wrap: on
line source
/* Copyright (c) 2002-2017 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "array.h" #include "backtrace-string.h" #include "llist.h" #include "time-util.h" #include "istream-private.h" #include "ioloop-private.h" #include <unistd.h> #define timer_is_larger(tvp, uvp) \ ((tvp)->tv_sec > (uvp)->tv_sec || \ ((tvp)->tv_sec == (uvp)->tv_sec && \ (tvp)->tv_usec > (uvp)->tv_usec)) time_t ioloop_time = 0; struct timeval ioloop_timeval; struct ioloop *current_ioloop = NULL; uint64_t ioloop_global_wait_usecs = 0; static ARRAY(io_switch_callback_t *) io_switch_callbacks = ARRAY_INIT; static void io_loop_initialize_handler(struct ioloop *ioloop) { unsigned int initial_fd_count; initial_fd_count = ioloop->max_fd_count > 0 && ioloop->max_fd_count < IOLOOP_INITIAL_FD_COUNT ? ioloop->max_fd_count : IOLOOP_INITIAL_FD_COUNT; io_loop_handler_init(ioloop, initial_fd_count); } static struct io_file * io_add_file(int fd, enum io_condition condition, const char *source_filename, unsigned int source_linenum, io_callback_t *callback, void *context) { struct io_file *io; i_assert(callback != NULL); i_assert((condition & IO_NOTIFY) == 0); io = i_new(struct io_file, 1); io->io.condition = condition; io->io.callback = callback; io->io.context = context; io->io.ioloop = current_ioloop; io->io.source_filename = source_filename; io->io.source_linenum = source_linenum; io->refcount = 1; io->fd = fd; if (io->io.ioloop->cur_ctx != NULL) { io->io.ctx = io->io.ioloop->cur_ctx; io_loop_context_ref(io->io.ctx); } if (io->io.ioloop->handler_context == NULL) io_loop_initialize_handler(io->io.ioloop); if (fd != -1) io_loop_handle_add(io); else { /* we're adding an istream whose only way to get notified is to call i_stream_set_input_pending() */ } if (io->io.ioloop->io_files != NULL) { io->io.ioloop->io_files->prev = io; io->next = io->io.ioloop->io_files; } io->io.ioloop->io_files = io; return io; } #undef io_add struct io *io_add(int fd, enum io_condition condition, const char *source_filename, unsigned int source_linenum, io_callback_t *callback, void *context) { struct io_file *io; i_assert(fd >= 0); io = io_add_file(fd, condition, source_filename, source_linenum, callback, context); return &io->io; } #undef io_add_istream struct io *io_add_istream(struct istream *input, const char *source_filename, unsigned int source_linenum, io_callback_t *callback, void *context) { struct io_file *io; io = io_add_file(i_stream_get_fd(input), IO_READ, source_filename, source_linenum, callback, context); io->istream = input; i_stream_ref(io->istream); i_stream_set_io(io->istream, &io->io); return &io->io; } static void io_file_unlink(struct io_file *io) { if (io->prev != NULL) io->prev->next = io->next; else io->io.ioloop->io_files = io->next; if (io->next != NULL) io->next->prev = io->prev; /* if we got here from an I/O handler callback, make sure we don't try to handle this one next. */ if (io->io.ioloop->next_io_file == io) io->io.ioloop->next_io_file = io->next; } static void io_remove_full(struct io **_io, bool closed) { struct io *io = *_io; i_assert(io->callback != NULL); *_io = NULL; /* make sure the callback doesn't get called anymore. kqueue code relies on this. */ io->callback = NULL; if (io->pending) { i_assert(io->ioloop->io_pending_count > 0); io->ioloop->io_pending_count--; } if (io->ctx != NULL) io_loop_context_unref(&io->ctx); if ((io->condition & IO_NOTIFY) != 0) io_loop_notify_remove(io); else { struct io_file *io_file = (struct io_file *)io; struct istream *istream = io_file->istream; if (istream != NULL) { /* remove io before it's freed */ i_stream_unset_io(istream, io); } io_file_unlink(io_file); if (io_file->fd != -1) io_loop_handle_remove(io_file, closed); else i_free(io); /* remove io from the ioloop before unreferencing the istream, because a destroyed istream may automatically close the fd. */ if (istream != NULL) i_stream_unref(&istream); } } void io_remove(struct io **io) { io_remove_full(io, FALSE); } void io_remove_closed(struct io **io) { i_assert(((*io)->condition & IO_NOTIFY) == 0); io_remove_full(io, TRUE); } void io_set_pending(struct io *io) { i_assert((io->condition & IO_NOTIFY) == 0); if (!io->pending) { io->pending = TRUE; io->ioloop->io_pending_count++; } } static void timeout_update_next(struct timeout *timeout, struct timeval *tv_now) { if (tv_now == NULL) { if (gettimeofday(&timeout->next_run, NULL) < 0) i_fatal("gettimeofday(): %m"); } else { timeout->next_run.tv_sec = tv_now->tv_sec; timeout->next_run.tv_usec = tv_now->tv_usec; } /* we don't want microsecond accuracy or this function will be called all the time - millisecond is more than enough */ timeout->next_run.tv_usec -= timeout->next_run.tv_usec % 1000; timeout->next_run.tv_sec += timeout->msecs/1000; timeout->next_run.tv_usec += (timeout->msecs%1000)*1000; if (timeout->next_run.tv_usec > 1000000) { timeout->next_run.tv_sec++; timeout->next_run.tv_usec -= 1000000; } } static struct timeout * timeout_add_common(const char *source_filename, unsigned int source_linenum, timeout_callback_t *callback, void *context) { struct timeout *timeout; timeout = i_new(struct timeout, 1); timeout->item.idx = UINT_MAX; timeout->source_filename = source_filename; timeout->source_linenum = source_linenum; timeout->ioloop = current_ioloop; timeout->callback = callback; timeout->context = context; if (timeout->ioloop->cur_ctx != NULL) { timeout->ctx = timeout->ioloop->cur_ctx; io_loop_context_ref(timeout->ctx); } return timeout; } #undef timeout_add struct timeout *timeout_add(unsigned int msecs, const char *source_filename, unsigned int source_linenum, timeout_callback_t *callback, void *context) { struct timeout *timeout; timeout = timeout_add_common(source_filename, source_linenum, callback, context); timeout->msecs = msecs; if (msecs > 0) { /* start this timeout in the next run cycle */ array_append(&timeout->ioloop->timeouts_new, &timeout, 1); } else { /* trigger zero timeouts as soon as possible */ timeout_update_next(timeout, timeout->ioloop->running ? NULL : &ioloop_timeval); priorityq_add(timeout->ioloop->timeouts, &timeout->item); } return timeout; } #undef timeout_add_short struct timeout * timeout_add_short(unsigned int msecs, const char *source_filename, unsigned int source_linenum, timeout_callback_t *callback, void *context) { return timeout_add(msecs, source_filename, source_linenum, callback, context); } #undef timeout_add_absolute struct timeout * timeout_add_absolute(const struct timeval *time, const char *source_filename, unsigned int source_linenum, timeout_callback_t *callback, void *context) { struct timeout *timeout; timeout = timeout_add_common(source_filename, source_linenum, callback, context); timeout->one_shot = TRUE; timeout->next_run = *time; priorityq_add(timeout->ioloop->timeouts, &timeout->item); return timeout; } static struct timeout * timeout_copy(const struct timeout *old_to) { struct timeout *new_to; new_to = timeout_add_common (old_to->source_filename, old_to->source_linenum, old_to->callback, old_to->context); new_to->one_shot = old_to->one_shot; new_to->msecs = old_to->msecs; new_to->next_run = old_to->next_run; if (old_to->item.idx != UINT_MAX) priorityq_add(new_to->ioloop->timeouts, &new_to->item); else if (!new_to->one_shot) { i_assert(new_to->msecs > 0); array_append(&new_to->ioloop->timeouts_new, &new_to, 1); } return new_to; } static void timeout_free(struct timeout *timeout) { if (timeout->ctx != NULL) io_loop_context_unref(&timeout->ctx); i_free(timeout); } void timeout_remove(struct timeout **_timeout) { struct timeout *timeout = *_timeout; struct ioloop *ioloop = timeout->ioloop; *_timeout = NULL; if (timeout->item.idx != UINT_MAX) priorityq_remove(timeout->ioloop->timeouts, &timeout->item); else if (!timeout->one_shot && timeout->msecs > 0) { struct timeout *const *to_idx; array_foreach(&ioloop->timeouts_new, to_idx) { if (*to_idx == timeout) { array_delete(&ioloop->timeouts_new, array_foreach_idx(&ioloop->timeouts_new, to_idx), 1); break; } } } timeout_free(timeout); } static void ATTR_NULL(2) timeout_reset_timeval(struct timeout *timeout, struct timeval *tv_now) { if (timeout->item.idx == UINT_MAX) return; timeout_update_next(timeout, tv_now); if (timeout->msecs <= 1) { /* if we came here from io_loop_handle_timeouts(), next_run must be larger than tv_now or we could go to infinite loop. +1000 to get 1 ms further, another +1000 to account for timeout_update_next()'s truncation. */ timeout->next_run.tv_usec += 2000; if (timeout->next_run.tv_usec >= 1000000) { timeout->next_run.tv_sec++; timeout->next_run.tv_usec -= 1000000; } } i_assert(tv_now == NULL || timeout->next_run.tv_sec > tv_now->tv_sec || (timeout->next_run.tv_sec == tv_now->tv_sec && timeout->next_run.tv_usec > tv_now->tv_usec)); priorityq_remove(timeout->ioloop->timeouts, &timeout->item); priorityq_add(timeout->ioloop->timeouts, &timeout->item); } void timeout_reset(struct timeout *timeout) { i_assert(!timeout->one_shot); timeout_reset_timeval(timeout, NULL); } static int timeout_get_wait_time(struct timeout *timeout, struct timeval *tv_r, struct timeval *tv_now) { int ret; if (tv_now->tv_sec == 0) { if (gettimeofday(tv_now, NULL) < 0) i_fatal("gettimeofday(): %m"); } tv_r->tv_sec = tv_now->tv_sec; tv_r->tv_usec = tv_now->tv_usec; i_assert(tv_r->tv_sec > 0); i_assert(timeout->next_run.tv_sec > 0); tv_r->tv_sec = timeout->next_run.tv_sec - tv_r->tv_sec; tv_r->tv_usec = timeout->next_run.tv_usec - tv_r->tv_usec; if (tv_r->tv_usec < 0) { tv_r->tv_sec--; tv_r->tv_usec += 1000000; } if (tv_r->tv_sec < 0 || (tv_r->tv_sec == 0 && tv_r->tv_usec < 1000)) { tv_r->tv_sec = 0; tv_r->tv_usec = 0; return 0; } if (tv_r->tv_sec > INT_MAX/1000-1) tv_r->tv_sec = INT_MAX/1000-1; /* round wait times up to next millisecond */ ret = tv_r->tv_sec * 1000 + (tv_r->tv_usec + 999) / 1000; i_assert(ret > 0 && tv_r->tv_sec >= 0 && tv_r->tv_usec >= 0); return ret; } int io_loop_get_wait_time(struct ioloop *ioloop, struct timeval *tv_r) { struct timeval tv_now; struct priorityq_item *item; struct timeout *timeout; int msecs; item = priorityq_peek(ioloop->timeouts); timeout = (struct timeout *)item; /* we need to see if there are pending IO waiting, if there is, we set msecs = 0 to ensure they are processed without delay */ if (timeout == NULL && ioloop->io_pending_count == 0) { /* no timeouts. use INT_MAX msecs for timeval and return -1 for poll/epoll infinity. */ tv_r->tv_sec = INT_MAX / 1000; tv_r->tv_usec = 0; ioloop->next_max_time = (1ULL << (TIME_T_MAX_BITS-1)) - 1; return -1; } if (ioloop->io_pending_count > 0) { if (gettimeofday(&tv_now, NULL) < 0) i_fatal("gettimeofday(): %m"); msecs = 0; tv_r->tv_sec = 0; tv_r->tv_usec = 0; } else { tv_now.tv_sec = 0; msecs = timeout_get_wait_time(timeout, tv_r, &tv_now); } ioloop->next_max_time = (tv_now.tv_sec + msecs/1000) + 1; /* update ioloop_timeval - this is meant for io_loop_handle_timeouts()'s ioloop_wait_usecs calculation. normally after this we go to the ioloop and after that we update ioloop_timeval immediately again. */ ioloop_timeval = tv_now; ioloop_time = tv_now.tv_sec; return msecs; } static int timeout_cmp(const void *p1, const void *p2) { const struct timeout *to1 = p1, *to2 = p2; return timeval_cmp(&to1->next_run, &to2->next_run); } static void io_loop_default_time_moved(time_t old_time, time_t new_time) { if (old_time > new_time) { i_warning("Time moved backwards by %ld seconds.", (long)(old_time - new_time)); } } static void io_loop_timeouts_start_new(struct ioloop *ioloop) { struct timeout *const *to_idx; if (array_count(&ioloop->timeouts_new) == 0) return; io_loop_time_refresh(); array_foreach(&ioloop->timeouts_new, to_idx) { struct timeout *timeout= *to_idx; i_assert(timeout->next_run.tv_sec == 0 && timeout->next_run.tv_usec == 0); i_assert(!timeout->one_shot); i_assert(timeout->msecs > 0); timeout_update_next(timeout, &ioloop_timeval); priorityq_add(ioloop->timeouts, &timeout->item); } array_clear(&ioloop->timeouts_new); } static void io_loop_timeouts_update(struct ioloop *ioloop, long diff_secs) { struct priorityq_item *const *items; unsigned int i, count; count = priorityq_count(ioloop->timeouts); items = priorityq_items(ioloop->timeouts); for (i = 0; i < count; i++) { struct timeout *to = (struct timeout *)items[i]; to->next_run.tv_sec += diff_secs; } } static void io_loops_timeouts_update(long diff_secs) { struct ioloop *ioloop; for (ioloop = current_ioloop; ioloop != NULL; ioloop = ioloop->prev) io_loop_timeouts_update(ioloop, diff_secs); } static void ioloop_add_wait_time(struct ioloop *ioloop) { struct io_wait_timer *timer; long long diff = timeval_diff_usecs(&ioloop_timeval, &ioloop->wait_started); ioloop->ioloop_wait_usecs += diff; ioloop_global_wait_usecs += diff; for (timer = ioloop->wait_timers; timer != NULL; timer = timer->next) timer->usecs += diff; } static void io_loop_handle_timeouts_real(struct ioloop *ioloop) { struct priorityq_item *item; struct timeval tv, tv_call; unsigned int t_id; if (gettimeofday(&ioloop_timeval, NULL) < 0) i_fatal("gettimeofday(): %m"); /* Don't bother comparing usecs. */ if (unlikely(ioloop_time > ioloop_timeval.tv_sec)) { /* time moved backwards */ io_loops_timeouts_update(-(long)(ioloop_time - ioloop_timeval.tv_sec)); ioloop->time_moved_callback(ioloop_time, ioloop_timeval.tv_sec); /* the callback may have slept, so check the time again. */ if (gettimeofday(&ioloop_timeval, NULL) < 0) i_fatal("gettimeofday(): %m"); } else { if (unlikely(ioloop_timeval.tv_sec > ioloop->next_max_time)) { io_loops_timeouts_update(ioloop_timeval.tv_sec - ioloop->next_max_time); /* time moved forwards */ ioloop->time_moved_callback(ioloop->next_max_time, ioloop_timeval.tv_sec); } ioloop_add_wait_time(ioloop); } ioloop_time = ioloop_timeval.tv_sec; tv_call = ioloop_timeval; while ((item = priorityq_peek(ioloop->timeouts)) != NULL) { struct timeout *timeout = (struct timeout *)item; /* use tv_call to make sure we don't get to infinite loop in case callbacks update ioloop_timeval. */ if (timeout_get_wait_time(timeout, &tv, &tv_call) > 0) break; if (timeout->one_shot) { /* remove timeout from queue */ priorityq_remove(timeout->ioloop->timeouts, &timeout->item); } else { /* update timeout's next_run and reposition it in the queue */ timeout_reset_timeval(timeout, &tv_call); } if (timeout->ctx != NULL) io_loop_context_activate(timeout->ctx); t_id = t_push_named("ioloop timeout handler %p", (void *)timeout->callback); timeout->callback(timeout->context); if (t_pop() != t_id) { i_panic("Leaked a t_pop() call in timeout handler %p", (void *)timeout->callback); } if (ioloop->cur_ctx != NULL) io_loop_context_deactivate(ioloop->cur_ctx); } } void io_loop_handle_timeouts(struct ioloop *ioloop) { T_BEGIN { io_loop_handle_timeouts_real(ioloop); } T_END; } void io_loop_call_io(struct io *io) { struct ioloop *ioloop = io->ioloop; unsigned int t_id; if (io->pending) { i_assert(ioloop->io_pending_count > 0); ioloop->io_pending_count--; io->pending = FALSE; } if (io->ctx != NULL) io_loop_context_activate(io->ctx); t_id = t_push_named("ioloop handler %p", (void *)io->callback); io->callback(io->context); if (t_pop() != t_id) { i_panic("Leaked a t_pop() call in I/O handler %p", (void *)io->callback); } if (ioloop->cur_ctx != NULL) io_loop_context_deactivate(ioloop->cur_ctx); } void io_loop_run(struct ioloop *ioloop) { if (ioloop->handler_context == NULL) io_loop_initialize_handler(ioloop); if (ioloop->cur_ctx != NULL) io_loop_context_unref(&ioloop->cur_ctx); /* recursive io_loop_run() isn't allowed for the same ioloop. it can break backends. */ i_assert(!ioloop->iolooping); ioloop->iolooping = TRUE; ioloop->running = TRUE; while (ioloop->running) io_loop_handler_run(ioloop); ioloop->iolooping = FALSE; } static void io_loop_call_pending(struct ioloop *ioloop) { struct io_file *io; while (ioloop->io_pending_count > 0) { io = ioloop->io_files; do { ioloop->next_io_file = io->next; if (io->io.pending) io_loop_call_io(&io->io); if (ioloop->io_pending_count == 0) break; io = ioloop->next_io_file; } while (io != NULL); } } void io_loop_handler_run(struct ioloop *ioloop) { io_loop_timeouts_start_new(ioloop); ioloop->wait_started = ioloop_timeval; io_loop_handler_run_internal(ioloop); io_loop_call_pending(ioloop); } void io_loop_stop(struct ioloop *ioloop) { ioloop->running = FALSE; } void io_loop_set_running(struct ioloop *ioloop) { ioloop->running = TRUE; } void io_loop_set_max_fd_count(struct ioloop *ioloop, unsigned int max_fds) { ioloop->max_fd_count = max_fds; } bool io_loop_is_running(struct ioloop *ioloop) { return ioloop->running; } void io_loop_time_refresh(void) { if (gettimeofday(&ioloop_timeval, NULL) < 0) i_fatal("gettimeofday(): %m"); ioloop_time = ioloop_timeval.tv_sec; } struct ioloop *io_loop_create(void) { struct ioloop *ioloop; /* initialize time */ if (gettimeofday(&ioloop_timeval, NULL) < 0) i_fatal("gettimeofday(): %m"); ioloop_time = ioloop_timeval.tv_sec; ioloop = i_new(struct ioloop, 1); ioloop->timeouts = priorityq_init(timeout_cmp, 32); i_array_init(&ioloop->timeouts_new, 8); ioloop->time_moved_callback = current_ioloop != NULL ? current_ioloop->time_moved_callback : io_loop_default_time_moved; ioloop->prev = current_ioloop; io_loop_set_current(ioloop); return ioloop; } void io_loop_destroy(struct ioloop **_ioloop) { struct ioloop *ioloop = *_ioloop; struct timeout *const *to_idx; struct priorityq_item *item; bool leaks = FALSE; *_ioloop = NULL; /* ->prev won't work unless loops are destroyed in create order */ i_assert(ioloop == current_ioloop); io_loop_set_current(current_ioloop->prev); if (ioloop->notify_handler_context != NULL) io_loop_notify_handler_deinit(ioloop); while (ioloop->io_files != NULL) { struct io_file *io = ioloop->io_files; struct io *_io = &io->io; i_warning("I/O leak: %p (%s:%u, fd %d)", (void *)io->io.callback, io->io.source_filename, io->io.source_linenum, io->fd); io_remove(&_io); leaks = TRUE; } i_assert(ioloop->io_pending_count == 0); array_foreach(&ioloop->timeouts_new, to_idx) { struct timeout *to = *to_idx; i_warning("Timeout leak: %p (%s:%u)", (void *)to->callback, to->source_filename, to->source_linenum); timeout_free(to); leaks = TRUE; } array_free(&ioloop->timeouts_new); while ((item = priorityq_pop(ioloop->timeouts)) != NULL) { struct timeout *to = (struct timeout *)item; i_warning("Timeout leak: %p (%s:%u)", (void *)to->callback, to->source_filename, to->source_linenum); timeout_free(to); leaks = TRUE; } priorityq_deinit(&ioloop->timeouts); while (ioloop->wait_timers != NULL) { struct io_wait_timer *timer = ioloop->wait_timers; i_warning("IO wait timer leak: %s:%u", timer->source_filename, timer->source_linenum); io_wait_timer_remove(&timer); leaks = TRUE; } if (leaks) { const char *backtrace; if (backtrace_get(&backtrace) == 0) i_warning("Raw backtrace for leaks: %s", backtrace); } if (ioloop->handler_context != NULL) io_loop_handler_deinit(ioloop); if (ioloop->cur_ctx != NULL) io_loop_context_deactivate(ioloop->cur_ctx); i_free(ioloop); } void io_loop_set_time_moved_callback(struct ioloop *ioloop, io_loop_time_moved_callback_t *callback) { ioloop->time_moved_callback = callback; } static void io_switch_callbacks_free(void) { array_free(&io_switch_callbacks); } void io_loop_set_current(struct ioloop *ioloop) { io_switch_callback_t *const *callbackp; struct ioloop *prev_ioloop = current_ioloop; if (ioloop == current_ioloop) return; current_ioloop = ioloop; if (array_is_created(&io_switch_callbacks)) { array_foreach(&io_switch_callbacks, callbackp) (*callbackp)(prev_ioloop); } } void io_loop_add_switch_callback(io_switch_callback_t *callback) { if (!array_is_created(&io_switch_callbacks)) { i_array_init(&io_switch_callbacks, 4); lib_atexit(io_switch_callbacks_free); } array_append(&io_switch_callbacks, &callback, 1); } void io_loop_remove_switch_callback(io_switch_callback_t *callback) { io_switch_callback_t *const *callbackp; unsigned int idx; array_foreach(&io_switch_callbacks, callbackp) { if (*callbackp == callback) { idx = array_foreach_idx(&io_switch_callbacks, callbackp); array_delete(&io_switch_callbacks, idx, 1); return; } } i_unreached(); } struct ioloop_context *io_loop_context_new(struct ioloop *ioloop) { struct ioloop_context *ctx; ctx = i_new(struct ioloop_context, 1); ctx->refcount = 2; ctx->ioloop = ioloop; i_array_init(&ctx->callbacks, 4); if (ioloop->cur_ctx != NULL) io_loop_context_unref(&ioloop->cur_ctx); ioloop->cur_ctx = ctx; return ctx; } void io_loop_context_ref(struct ioloop_context *ctx) { i_assert(ctx->refcount > 0); ctx->refcount++; } void io_loop_context_unref(struct ioloop_context **_ctx) { struct ioloop_context *ctx = *_ctx; *_ctx = NULL; i_assert(ctx->refcount > 0); if (--ctx->refcount > 0) return; /* cur_ctx itself keeps a reference */ i_assert(ctx->ioloop->cur_ctx != ctx); array_free(&ctx->callbacks); i_free(ctx); } #undef io_loop_context_add_callbacks void io_loop_context_add_callbacks(struct ioloop_context *ctx, io_callback_t *activate, io_callback_t *deactivate, void *context) { struct ioloop_context_callback cb; i_zero(&cb); cb.activate = activate; cb.deactivate = deactivate; cb.context = context; array_append(&ctx->callbacks, &cb, 1); } #undef io_loop_context_remove_callbacks void io_loop_context_remove_callbacks(struct ioloop_context *ctx, io_callback_t *activate, io_callback_t *deactivate, void *context) { struct ioloop_context_callback *cb; array_foreach_modifiable(&ctx->callbacks, cb) { if (cb->context == context && cb->activate == activate && cb->deactivate == deactivate) { /* simply mark it as deleted, since we could get here from activate/deactivate loop */ cb->activate = NULL; cb->deactivate = NULL; cb->context = NULL; return; } } i_panic("io_loop_context_remove_callbacks() context not found"); } static void io_loop_context_remove_deleted_callbacks(struct ioloop_context *ctx) { const struct ioloop_context_callback *cbs; unsigned int i, count; cbs = array_get(&ctx->callbacks, &count); for (i = 0; i < count; ) { if (cbs[i].activate != NULL) i++; else { array_delete(&ctx->callbacks, i, 1); cbs = array_get(&ctx->callbacks, &count); } } } void io_loop_context_activate(struct ioloop_context *ctx) { struct ioloop_context_callback *cb; i_assert(ctx->ioloop->cur_ctx == NULL); ctx->ioloop->cur_ctx = ctx; io_loop_context_ref(ctx); array_foreach_modifiable(&ctx->callbacks, cb) { i_assert(!cb->activated); if (cb->activate != NULL) cb->activate(cb->context); cb->activated = TRUE; } } void io_loop_context_deactivate(struct ioloop_context *ctx) { struct ioloop_context_callback *cb; i_assert(ctx->ioloop->cur_ctx != NULL); array_foreach_modifiable(&ctx->callbacks, cb) { if (!cb->activated) { /* we just added this callback. don't deactivate it before it gets first activated. */ } else { if (cb->deactivate != NULL) cb->deactivate(cb->context); cb->activated = FALSE; } } ctx->ioloop->cur_ctx = NULL; io_loop_context_remove_deleted_callbacks(ctx); io_loop_context_unref(&ctx); } struct ioloop_context *io_loop_get_current_context(struct ioloop *ioloop) { return ioloop->cur_ctx; } struct io *io_loop_move_io(struct io **_io) { struct io *old_io = *_io; struct io_file *old_io_file, *new_io_file; i_assert((old_io->condition & IO_NOTIFY) == 0); if (old_io->ioloop == current_ioloop) return old_io; old_io_file = (struct io_file *)old_io; new_io_file = io_add_file(old_io_file->fd, old_io->condition, old_io->source_filename, old_io->source_linenum, old_io->callback, old_io->context); if (old_io_file->istream != NULL) { /* reference before io_remove() */ new_io_file->istream = old_io_file->istream; i_stream_ref(new_io_file->istream); } if (old_io->pending) io_set_pending(&new_io_file->io); io_remove(_io); if (new_io_file->istream != NULL) { /* update istream io after it was removed with io_remove() */ i_stream_set_io(new_io_file->istream, &new_io_file->io); } return &new_io_file->io; } struct timeout *io_loop_move_timeout(struct timeout **_timeout) { struct timeout *new_to, *old_to = *_timeout; if (old_to->ioloop == current_ioloop) return old_to; new_to = timeout_copy(old_to); timeout_remove(_timeout); return new_to; } bool io_loop_have_ios(struct ioloop *ioloop) { return ioloop->io_files != NULL; } bool io_loop_have_immediate_timeouts(struct ioloop *ioloop) { struct timeval tv; return io_loop_get_wait_time(ioloop, &tv) == 0; } uint64_t io_loop_get_wait_usecs(struct ioloop *ioloop) { return ioloop->ioloop_wait_usecs; } enum io_condition io_loop_find_fd_conditions(struct ioloop *ioloop, int fd) { enum io_condition conditions = 0; struct io_file *io; i_assert(fd >= 0); for (io = ioloop->io_files; io != NULL; io = io->next) { if (io->fd == fd) conditions |= io->io.condition; } return conditions; } #undef io_wait_timer_add struct io_wait_timer * io_wait_timer_add(const char *source_filename, unsigned int source_linenum) { struct io_wait_timer *timer; timer = i_new(struct io_wait_timer, 1); timer->ioloop = current_ioloop; timer->source_filename = source_filename; timer->source_linenum = source_linenum; DLLIST_PREPEND(¤t_ioloop->wait_timers, timer); return timer; } struct io_wait_timer *io_wait_timer_move(struct io_wait_timer **_timer) { struct io_wait_timer *timer = *_timer; *_timer = NULL; DLLIST_REMOVE(&timer->ioloop->wait_timers, timer); DLLIST_PREPEND(¤t_ioloop->wait_timers, timer); timer->ioloop = current_ioloop; return timer; } void io_wait_timer_remove(struct io_wait_timer **_timer) { struct io_wait_timer *timer = *_timer; *_timer = NULL; DLLIST_REMOVE(&timer->ioloop->wait_timers, timer); i_free(timer); } uint64_t io_wait_timer_get_usecs(struct io_wait_timer *timer) { return timer->usecs; }