Mercurial > dovecot > core-2.2
changeset 10265:8ebf82849077 HEAD
lib-master: Added anvil client code.
author | Timo Sirainen <tss@iki.fi> |
---|---|
date | Fri, 06 Nov 2009 13:57:04 -0500 |
parents | 7bdb5816f797 |
children | e99c02873d8c |
files | src/lib-master/Makefile.am src/lib-master/anvil-client.c src/lib-master/anvil-client.h |
diffstat | 3 files changed, 191 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lib-master/Makefile.am Fri Nov 06 13:56:10 2009 -0500 +++ b/src/lib-master/Makefile.am Fri Nov 06 13:57:04 2009 -0500 @@ -9,6 +9,7 @@ -DBINDIR=\""$(bindir)"\" libmaster_la_SOURCES = \ + anvil-client.c \ master-auth.c \ master-login.c \ master-login-auth.c \ @@ -17,6 +18,7 @@ syslog-util.c noinst_HEADERS = \ + anvil-client.h \ master-auth.h \ master-interface.h \ master-login.h \
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib-master/anvil-client.c Fri Nov 06 13:57:04 2009 -0500 @@ -0,0 +1,169 @@ +/* Copyright (c) 2009 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "network.h" +#include "istream.h" +#include "ostream.h" +#include "array.h" +#include "aqueue.h" +#include "anvil-client.h" + +struct anvil_query { + anvil_callback_t *callback; + void *context; +}; + +struct anvil_client { + char *path; + int fd; + struct istream *input; + struct ostream *output; + struct io *io; + + ARRAY_DEFINE(queries_arr, struct anvil_query); + struct aqueue *queries; + + bool (*reconnect_callback)(void); +}; + +#define ANVIL_HANDSHAKE "VERSION\tanvil\t1\t0\n" +#define ANVIL_INBUF_SIZE 1024 + +static void anvil_client_disconnect(struct anvil_client *client); + +struct anvil_client * +anvil_client_init(const char *path, bool (*reconnect_callback)(void)) +{ + struct anvil_client *client; + + client = i_new(struct anvil_client, 1); + client->path = i_strdup(path); + client->reconnect_callback = reconnect_callback; + client->fd = -1; + i_array_init(&client->queries_arr, 32); + client->queries = aqueue_init(&client->queries_arr.arr); + return client; +} + +void anvil_client_deinit(struct anvil_client **_client) +{ + struct anvil_client *client = *_client; + + *_client = NULL; + + anvil_client_disconnect(client); + array_free(&client->queries_arr); + aqueue_deinit(&client->queries); + i_free(client->path); + i_free(client); +} + +static void anvil_reconnect(struct anvil_client *client) +{ + anvil_client_disconnect(client); + if (client->reconnect_callback != NULL) { + if (!client->reconnect_callback()) { + /* no reconnection */ + return; + } + } + (void)anvil_client_connect(client, FALSE); +} + +static void anvil_input(struct anvil_client *client) +{ + const struct anvil_query *queries, *query; + const char *line; + unsigned int count; + + queries = array_get(&client->queries_arr, &count); + while ((line = i_stream_read_next_line(client->input)) != NULL) { + if (aqueue_count(client->queries) == 0) { + i_error("anvil: Unexpected input: %s", line); + continue; + } + + query = &queries[aqueue_idx(client->queries, 0)]; + query->callback(line, query->context); + aqueue_delete_tail(client->queries); + } + if (client->input->stream_errno != 0) { + i_error("read(%s) failed: %m", client->path); + anvil_reconnect(client); + } else if (client->input->eof) { + i_error("read(%s) failed: EOF", client->path); + anvil_reconnect(client); + } +} + +int anvil_client_connect(struct anvil_client *client, bool retry) +{ + int fd; + + i_assert(client->fd == -1); + + fd = retry ? net_connect_unix_with_retries(client->path, 5000) : + net_connect_unix(client->path); + if (fd == -1) { + i_error("net_connect_unix(%s) failed: %m", client->path); + return -1; + } + + client->fd = fd; + client->input = i_stream_create_fd(fd, ANVIL_INBUF_SIZE, FALSE); + client->output = o_stream_create_fd(fd, (size_t)-1, FALSE); + client->io = io_add(fd, IO_READ, anvil_input, client); + o_stream_send_str(client->output, ANVIL_HANDSHAKE); + return 0; +} + +static void anvil_client_cancel_queries(struct anvil_client *client) +{ + const struct anvil_query *queries, *query; + unsigned int count; + + queries = array_get(&client->queries_arr, &count); + while (aqueue_count(client->queries) > 0) { + query = &queries[aqueue_idx(client->queries, 0)]; + query->callback(NULL, query->context); + aqueue_delete_tail(client->queries); + } +} + +static void anvil_client_disconnect(struct anvil_client *client) +{ + if (client->fd == -1) + return; + + anvil_client_cancel_queries(client); + io_remove(&client->io); + i_stream_destroy(&client->input); + o_stream_destroy(&client->output); + net_disconnect(client->fd); + client->fd = -1; +} + +void anvil_client_query(struct anvil_client *client, const char *query, + anvil_callback_t *callback, void *context) +{ + struct const_iovec iov[2]; + struct anvil_query anvil_query; + + if (client->fd == -1) { + if (anvil_client_connect(client, FALSE) < 0) { + callback(NULL, context); + return; + } + } + + iov[0].iov_base = query; + iov[0].iov_len = strlen(query); + iov[1].iov_base = "\n"; + iov[1].iov_len = 1; + o_stream_sendv(client->output, iov, 2); + + anvil_query.callback = callback; + anvil_query.context = context; + aqueue_append(client->queries, &anvil_query); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/lib-master/anvil-client.h Fri Nov 06 13:57:04 2009 -0500 @@ -0,0 +1,20 @@ +#ifndef ANVIL_CLIENT_H +#define ANVIL_CLIENT_H + +/* reply=NULL if query failed */ +typedef void anvil_callback_t(const char *reply, void *context); + +/* If reconnect_callback is specified, it's called when connection is lost. + If the callback returns FALSE, reconnection isn't attempted. */ +struct anvil_client * +anvil_client_init(const char *path, bool (*reconnect_callback)(void)); +void anvil_client_deinit(struct anvil_client **client); + +/* Connect to anvil. If retry=TRUE, try connecting for a while */ +int anvil_client_connect(struct anvil_client *client, bool retry); + +/* Send a query to anvil, expect a one line reply. */ +void anvil_client_query(struct anvil_client *client, const char *query, + anvil_callback_t *callback, void *context); + +#endif