Mercurial > dovecot > original-hg > dovecot-2.2
changeset 16739:a6ed95a30cb1
lib-http: Added support for asynchronous payload for requests.
This means that the payload stream passed to the request can be a
non-blocking socket stream from some other connection (e.g. proxy client
connection).
author | Stephan Bosch <stephan@rename-it.nl> |
---|---|
date | Sun, 15 Sep 2013 03:35:04 +0300 |
parents | 3bd334529536 |
children | adb4d013073d |
files | src/lib-http/http-client-connection.c src/lib-http/http-client-private.h src/lib-http/http-client-request.c |
diffstat | 3 files changed, 35 insertions(+), 5 deletions(-) [+] |
line wrap: on
line diff
--- a/src/lib-http/http-client-connection.c Sun Sep 15 03:34:06 2013 +0300 +++ b/src/lib-http/http-client-connection.c Sun Sep 15 03:35:04 2013 +0300 @@ -655,7 +655,7 @@ } } -static int http_client_connection_output(struct http_client_connection *conn) +int http_client_connection_output(struct http_client_connection *conn) { struct http_client_request *const *req_idx, *req; struct ostream *output = conn->conn.output; @@ -944,6 +944,8 @@ ssl_iostream_unref(&conn->ssl_iostream); connection_deinit(&conn->conn); + if (conn->io_req_payload != NULL) + io_remove(&conn->io_req_payload); if (conn->to_requests != NULL) timeout_remove(&conn->to_requests); if (conn->to_connect != NULL) @@ -972,6 +974,8 @@ void http_client_connection_switch_ioloop(struct http_client_connection *conn) { + if (conn->io_req_payload != NULL) + conn->io_req_payload = io_loop_move_io(&conn->io_req_payload); if (conn->to_requests != NULL) conn->to_requests = io_loop_move_timeout(&conn->to_requests); if (conn->to_connect != NULL)
--- a/src/lib-http/http-client-private.h Sun Sep 15 03:34:06 2013 +0300 +++ b/src/lib-http/http-client-private.h Sun Sep 15 03:35:04 2013 +0300 @@ -161,6 +161,7 @@ struct http_client_request *pending_request; struct istream *incoming_payload; + struct io *io_req_payload; /* requests that have been sent, waiting for response */ ARRAY_TYPE(http_client_request) request_wait_list; @@ -244,6 +245,7 @@ http_client_connection_create(struct http_client_peer *peer); void http_client_connection_ref(struct http_client_connection *conn); void http_client_connection_unref(struct http_client_connection **_conn); +int http_client_connection_output(struct http_client_connection *conn); unsigned int http_client_connection_count_pending(struct http_client_connection *conn); bool http_client_connection_is_ready(struct http_client_connection *conn);
--- a/src/lib-http/http-client-request.c Sun Sep 15 03:34:06 2013 +0300 +++ b/src/lib-http/http-client-request.c Sun Sep 15 03:35:04 2013 +0300 @@ -232,7 +232,8 @@ } req->state = HTTP_REQUEST_STATE_WAITING; req->conn->output_locked = FALSE; - http_client_request_debug(req, "Sent all payload"); + + http_client_request_debug(req, "Finished sending payload"); } static int @@ -321,16 +322,30 @@ return http_client_request_continue_payload(_req, NULL, 0); } +static void http_client_request_payload_input(struct http_client_request *req) +{ + struct http_client_connection *conn = req->conn; + + if (conn->io_req_payload != NULL) + io_remove(&conn->io_req_payload); + + (void)http_client_connection_output(conn); +} + int http_client_request_send_more(struct http_client_request *req, const char **error_r) { struct http_client_connection *conn = req->conn; struct ostream *output = req->payload_output; off_t ret; + int fd; i_assert(req->payload_input != NULL); i_assert(req->payload_output != NULL); + if (conn->io_req_payload != NULL) + io_remove(&conn->io_req_payload); + /* chunked ostream needs to write to the parent stream's buffer */ o_stream_set_max_buffer_size(output, IO_BLOCK_SIZE); ret = o_stream_send_istream(output, req->payload_input); @@ -340,15 +355,17 @@ errno = req->payload_input->stream_errno; *error_r = t_strdup_printf("read(%s) failed: %m", i_stream_get_name(req->payload_input)); + ret = -1; } else if (output->stream_errno != 0) { errno = output->stream_errno; *error_r = t_strdup_printf("write(%s) failed: %m", o_stream_get_name(output)); + ret = -1; } else { i_assert(ret >= 0); } - if (!i_stream_have_bytes_left(req->payload_input)) { + if (ret < 0 || i_stream_is_eof(req->payload_input)) { if (!req->payload_chunked && req->payload_input->v_offset - req->payload_offset != req->payload_size) { i_error("stream input size changed"); //FIXME @@ -362,11 +379,18 @@ } else { http_client_request_finish_payload_out(req); } - - } else { + } else if (i_stream_get_data_size(req->payload_input) > 0) { + /* output is blocking */ conn->output_locked = TRUE; o_stream_set_flush_pending(output, TRUE); http_client_request_debug(req, "Partially sent payload"); + } else { + /* input is blocking */ + fd = i_stream_get_fd(req->payload_input); + conn->output_locked = TRUE; + i_assert(fd >= 0); + conn->io_req_payload = io_add + (fd, IO_READ, http_client_request_payload_input, req); } return ret < 0 ? -1 : 0; }