changeset 22357:732e248c2e26

cassandra: Add page_size setting to enable paged results for queries
author Timo Sirainen <timo.sirainen@dovecot.fi>
date Mon, 17 Jul 2017 17:54:07 +0300
parents 2fe6e07903f2
children 4f3967e2f493
files src/lib-sql/driver-cassandra.c
diffstat 1 files changed, 77 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib-sql/driver-cassandra.c	Mon Jul 17 17:51:03 2017 +0300
+++ b/src/lib-sql/driver-cassandra.c	Mon Jul 17 17:54:07 2017 +0300
@@ -56,6 +56,7 @@
 
 enum cassandra_query_type {
 	CASSANDRA_QUERY_TYPE_READ,
+	CASSANDRA_QUERY_TYPE_READ_MORE,
 	CASSANDRA_QUERY_TYPE_WRITE,
 	CASSANDRA_QUERY_TYPE_DELETE
 };
@@ -88,6 +89,7 @@
 	unsigned int warn_timeout_secs;
 	unsigned int heartbeat_interval_secs, idle_timeout_secs;
 	unsigned int execution_retry_interval_msecs, execution_retry_times;
+	unsigned int page_size;
 	in_port_t port;
 
 	CassCluster *cluster;
@@ -516,6 +518,9 @@
 #ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
 			i_fatal("cassandra: This cassandra version does not support execution_retry_times");
 #endif
+		} else if (strcmp(key, "page_size") == 0) {
+			if (str_to_uint(value, &db->page_size) < 0)
+				i_fatal("cassandra: Invalid page_size: %s", value);
 		} else {
 			i_fatal("cassandra: Unknown connect string: %s", key);
 		}
@@ -886,12 +891,20 @@
 
 static void driver_cassandra_init_statement(struct cassandra_result *result)
 {
+	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+
+	if (result->statement != NULL) {
+		/* continuing a paged result */
+		return;
+	}
 	result->statement = cass_statement_new(result->query, 0);
 	cass_statement_set_consistency(result->statement, result->consistency);
 
 #ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
 	cass_statement_set_is_idempotent(result->statement, cass_true);
 #endif
+	if (db->page_size > 0)
+		cass_statement_set_paging_size(result->statement, db->page_size);
 }
 
 static void driver_cassandra_result_send_query(struct cassandra_result *result)
@@ -961,6 +974,11 @@
 		result->consistency = db->read_consistency;
 		result->fallback_consistency = db->read_fallback_consistency;
 		break;
+	case CASSANDRA_QUERY_TYPE_READ_MORE:
+		/* consistency is already set and we don't want to fallback
+		   at this point anymore. */
+		result->fallback_consistency = result->consistency;
+		break;
 	case CASSANDRA_QUERY_TYPE_WRITE:
 		result->consistency = db->write_consistency;
 		result->fallback_consistency = db->write_fallback_consistency;
@@ -1184,6 +1202,24 @@
 	return 0;
 }
 
+static int driver_cassandra_result_next_page(struct cassandra_result *result)
+{
+	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+
+	if (db->page_size == 0) {
+		/* no paging */
+		return 0;
+	}
+	if (cass_result_has_more_pages(result->result) == cass_false)
+		return 0;
+
+	/* callers that don't support sql_query_more() will still get a useful
+	   error message. */
+	i_free(result->error);
+	result->error = i_strdup("Paged query has more results, but not supported by the caller");
+	return SQL_RESULT_NEXT_MORE;
+}
+
 static int driver_cassandra_result_next_row(struct sql_result *_result)
 {
 	struct cassandra_result *result = (struct cassandra_result *)_result;
@@ -1198,7 +1234,7 @@
 		return -1;
 
 	if (!cass_iterator_next(result->iterator))
-		return 0;
+		return driver_cassandra_result_next_page(result);
 	result->row_count++;
 
 	p_clear(result->row_pool);
@@ -1217,6 +1253,45 @@
 	return ret;
 }
 
+static void
+driver_cassandra_result_more(struct sql_result **_result, bool async,
+			     sql_query_callback_t *callback, void *context)
+{
+	struct cassandra_db *db = (struct cassandra_db *)(*_result)->db;
+	struct cassandra_result *new_result;
+	struct cassandra_result *old_result =
+		(struct cassandra_result *)*_result;
+
+	/* Initialize the next page as a new sql_result */
+	new_result = driver_cassandra_query_init(db, old_result->query,
+						 CASSANDRA_QUERY_TYPE_READ_MORE,
+						 callback, context);
+
+	/* Preserve the statement and update its paging state */
+	new_result->statement = old_result->statement;
+	old_result->statement = NULL;
+	cass_statement_set_paging_state(new_result->statement,
+					old_result->result);
+
+	sql_result_unref(*_result);
+	*_result = NULL;
+
+	if (async)
+		(void)driver_cassandra_send_query(new_result);
+	else {
+		i_assert(db->api.state == SQL_DB_STATE_IDLE);
+		driver_cassandra_sync_init(db);
+		(void)driver_cassandra_send_query(new_result);
+		if (new_result->result == NULL) {
+			db->io_pipe = io_loop_move_io(&db->io_pipe);
+			io_loop_run(db->ioloop);
+		}
+		driver_cassandra_sync_deinit(db);
+
+		callback(&new_result->api, context);
+	}
+}
+
 static unsigned int
 driver_cassandra_result_get_fields_count(struct sql_result *_result)
 {
@@ -1538,7 +1613,7 @@
 		driver_cassandra_result_find_field_value,
 		driver_cassandra_result_get_values,
 		driver_cassandra_result_get_error,
-		NULL,
+		driver_cassandra_result_more,
 	}
 };