changeset 19748:b0ecdc6cb8c2

lib-http: server: Implemented blocking request input stream.
author Stephan Bosch <stephan@rename-it.nl>
date Wed, 10 Feb 2016 22:32:46 +0100
parents 36963988e4f8
children cea1e2bccd1c
files src/lib-http/http-server-connection.c src/lib-http/http-server-private.h src/lib-http/http-server-request.c src/lib-http/http-server.h
diffstat 4 files changed, 167 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib-http/http-server-connection.c	Wed Feb 10 22:30:22 2016 +0100
+++ b/src/lib-http/http-server-connection.c	Wed Feb 10 22:32:46 2016 +0100
@@ -1086,11 +1086,18 @@
 
 void http_server_connection_switch_ioloop(struct http_server_connection *conn)
 {
+	if (conn->switching_ioloop)
+		return;
+
+	conn->switching_ioloop = TRUE;
 	if (conn->to_input != NULL)
 		conn->to_input = io_loop_move_timeout(&conn->to_input);
 	if (conn->to_idle != NULL)
 		conn->to_idle = io_loop_move_timeout(&conn->to_idle);
 	if (conn->io_resp_payload != NULL)
 		conn->io_resp_payload = io_loop_move_io(&conn->io_resp_payload);
+	if (conn->incoming_payload != NULL)
+		i_stream_switch_ioloop(conn->incoming_payload);
 	connection_switch_ioloop(&conn->conn);
+	conn->switching_ioloop = FALSE;
 }
--- a/src/lib-http/http-server-private.h	Wed Feb 10 22:30:22 2016 +0100
+++ b/src/lib-http/http-server-private.h	Wed Feb 10 22:32:46 2016 +0100
@@ -83,6 +83,8 @@
 	struct http_server *server;
 	struct http_server_connection *conn;
 
+	struct istream *payload_input;
+
 	struct http_server_response *response;
 
 	void (*destroy_callback)(void *);
@@ -125,6 +127,7 @@
 	unsigned int input_broken:1;
 	unsigned int output_locked:1;
 	unsigned int in_req_callback:1;  /* performing request callback (busy) */
+	unsigned int switching_ioloop:1; /* in the middle of switching ioloop */
 };
 
 struct http_server {
--- a/src/lib-http/http-server-request.c	Wed Feb 10 22:30:22 2016 +0100
+++ b/src/lib-http/http-server-request.c	Wed Feb 10 22:32:46 2016 +0100
@@ -3,6 +3,7 @@
 #include "lib.h"
 #include "ioloop.h"
 #include "ostream.h"
+#include "istream-private.h"
 
 #include "http-server-private.h"
 
@@ -355,3 +356,151 @@
 	http_server_request_fail_auth(req, reason, &chlng);
 }
 
+/*
+ * Payload input stream
+ */
+
+struct http_server_istream {
+	struct istream_private istream;
+
+	struct http_server_request *req;
+
+	ssize_t read_status;
+};
+
+static void
+http_server_istream_switch_ioloop(struct istream_private *stream)
+{
+	struct http_server_istream *hsristream =
+		(struct http_server_istream *)stream;
+
+	if (hsristream->istream.istream.blocking)
+		return;
+
+	http_server_connection_switch_ioloop(hsristream->req->conn);
+}
+
+static void
+http_server_istream_read_any(struct http_server_istream *hsristream)
+{
+	struct istream_private *stream = &hsristream->istream;
+	struct http_server *server = hsristream->req->server;
+	ssize_t ret;
+
+	if ((ret=i_stream_read_copy_from_parent
+		(&stream->istream)) > 0) {
+		hsristream->read_status = ret;
+		io_loop_stop(server->ioloop);
+	}
+}
+
+static ssize_t
+http_server_istream_read(struct istream_private *stream)
+{
+	struct http_server_istream *hsristream =
+		(struct http_server_istream *)stream;
+	struct http_server_request *req = hsristream->req;
+	struct http_server *server;
+	struct http_server_connection *conn;
+	bool blocking = stream->istream.blocking;
+	ssize_t ret;
+
+	if (req == NULL) {
+		/* request already gone (we shouldn't get here) */
+		stream->istream.stream_errno = EINVAL;
+		ret = -1;
+	}
+
+	i_stream_seek(stream->parent, stream->parent_start_offset +
+		      stream->istream.v_offset);
+
+	server = hsristream->req->server;
+	conn = hsristream->req->conn;
+
+	ret = i_stream_read_copy_from_parent(&stream->istream);
+	if (ret == 0 && blocking) {
+		struct ioloop *prev_ioloop = current_ioloop;
+		struct io *io;
+
+		http_server_connection_ref(conn);
+		http_server_request_ref(req);
+
+		i_assert(server->ioloop == NULL);
+		server->ioloop = io_loop_create();
+		http_server_connection_switch_ioloop(conn);
+
+		if (blocking && req->req.expect_100_continue &&
+			!req->sent_100_continue)
+			http_server_connection_trigger_responses(conn);
+
+		hsristream->read_status = 0;
+		io = io_add_istream(&stream->istream,
+			http_server_istream_read_any, hsristream);
+		while (req->state < HTTP_SERVER_REQUEST_STATE_FINISHED &&
+			hsristream->read_status == 0) {
+			io_loop_run(server->ioloop);
+		}
+		io_remove(&io);
+
+		io_loop_set_current(prev_ioloop);
+		http_server_connection_switch_ioloop(conn);
+		io_loop_set_current(server->ioloop);
+		io_loop_destroy(&server->ioloop);
+
+		ret = hsristream->read_status;
+
+		http_server_request_unref(&req);
+		if (req == NULL)
+			hsristream->req = NULL;
+		http_server_connection_unref(&conn);
+	}
+
+	return ret;
+}
+
+static void
+http_server_istream_destroy(struct iostream_private *stream)
+{
+	struct http_server_istream *hsristream =
+		(struct http_server_istream *)stream;
+	uoff_t v_offset;
+
+	v_offset = hsristream->istream.parent_start_offset +
+		hsristream->istream.istream.v_offset;
+	if (hsristream->istream.parent->seekable ||
+		v_offset > hsristream->istream.parent->v_offset) {
+		/* get to same position in parent stream */
+		i_stream_seek(hsristream->istream.parent, v_offset);
+	}
+
+	i_stream_unref(&hsristream->istream.parent);
+}
+
+struct istream *
+http_server_request_get_payload_input(struct http_server_request *req,
+	bool blocking)
+{
+	struct http_server_istream *hsristream;
+	struct istream *payload = req->req.payload;
+
+	i_assert(req->payload_input == NULL);
+
+	hsristream = i_new(struct http_server_istream, 1);
+	hsristream->req = req;
+	hsristream->istream.max_buffer_size =
+		payload->real_stream->max_buffer_size;
+	hsristream->istream.stream_size_passthrough = TRUE;
+
+	hsristream->istream.read = http_server_istream_read;
+	hsristream->istream.switch_ioloop = http_server_istream_switch_ioloop;
+	hsristream->istream.iostream.destroy = http_server_istream_destroy;
+
+	hsristream->istream.istream.readable_fd = FALSE;
+	hsristream->istream.istream.blocking = blocking;
+	hsristream->istream.istream.seekable = FALSE;
+
+	req->payload_input = i_stream_create
+		(&hsristream->istream, payload, i_stream_get_fd(payload));
+	i_stream_unref(&req->req.payload);
+	return req->payload_input;
+}
--- a/src/lib-http/http-server.h	Wed Feb 10 22:30:22 2016 +0100
+++ b/src/lib-http/http-server.h	Wed Feb 10 22:32:46 2016 +0100
@@ -91,6 +91,14 @@
    or because the request was aborted. */
 bool http_server_request_is_finished(struct http_server_request *req);
 
+/* Return input stream for the request's payload. Optionally, this stream
+   can be made blocking. Do *NOT* meddle with the FD of the http_request
+   payload to achieve the same, because protocol violations will result.
+ */
+struct istream *
+http_server_request_get_payload_input(struct http_server_request *req,
+	bool blocking);
+
 /* Get the authentication credentials provided in this request. Returns 0 if
    the Authorization header is absent, returns -1 when that header cannot be
    parsed, and returns 1 otherwise */