# HG changeset patch # User Timo Sirainen # Date 1500303247 -10800 # Node ID 732e248c2e26d6c73b6802c55887fc9112c4a955 # Parent 2fe6e07903f213b6571ded7186af922209600587 cassandra: Add page_size setting to enable paged results for queries diff -r 2fe6e07903f2 -r 732e248c2e26 src/lib-sql/driver-cassandra.c --- a/src/lib-sql/driver-cassandra.c Mon Jul 17 17:51:03 2017 +0300 +++ b/src/lib-sql/driver-cassandra.c Mon Jul 17 17:54:07 2017 +0300 @@ -56,6 +56,7 @@ enum cassandra_query_type { CASSANDRA_QUERY_TYPE_READ, + CASSANDRA_QUERY_TYPE_READ_MORE, CASSANDRA_QUERY_TYPE_WRITE, CASSANDRA_QUERY_TYPE_DELETE }; @@ -88,6 +89,7 @@ unsigned int warn_timeout_secs; unsigned int heartbeat_interval_secs, idle_timeout_secs; unsigned int execution_retry_interval_msecs, execution_retry_times; + unsigned int page_size; in_port_t port; CassCluster *cluster; @@ -516,6 +518,9 @@ #ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY i_fatal("cassandra: This cassandra version does not support execution_retry_times"); #endif + } else if (strcmp(key, "page_size") == 0) { + if (str_to_uint(value, &db->page_size) < 0) + i_fatal("cassandra: Invalid page_size: %s", value); } else { i_fatal("cassandra: Unknown connect string: %s", key); } @@ -886,12 +891,20 @@ static void driver_cassandra_init_statement(struct cassandra_result *result) { + struct cassandra_db *db = (struct cassandra_db *)result->api.db; + + if (result->statement != NULL) { + /* continuing a paged result */ + return; + } result->statement = cass_statement_new(result->query, 0); cass_statement_set_consistency(result->statement, result->consistency); #ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY cass_statement_set_is_idempotent(result->statement, cass_true); #endif + if (db->page_size > 0) + cass_statement_set_paging_size(result->statement, db->page_size); } static void driver_cassandra_result_send_query(struct cassandra_result *result) @@ -961,6 +974,11 @@ result->consistency = db->read_consistency; result->fallback_consistency = db->read_fallback_consistency; break; + case CASSANDRA_QUERY_TYPE_READ_MORE: + /* consistency is already set and we don't want to fallback + at this point anymore. */ + result->fallback_consistency = result->consistency; + break; case CASSANDRA_QUERY_TYPE_WRITE: result->consistency = db->write_consistency; result->fallback_consistency = db->write_fallback_consistency; @@ -1184,6 +1202,24 @@ return 0; } +static int driver_cassandra_result_next_page(struct cassandra_result *result) +{ + struct cassandra_db *db = (struct cassandra_db *)result->api.db; + + if (db->page_size == 0) { + /* no paging */ + return 0; + } + if (cass_result_has_more_pages(result->result) == cass_false) + return 0; + + /* callers that don't support sql_query_more() will still get a useful + error message. */ + i_free(result->error); + result->error = i_strdup("Paged query has more results, but not supported by the caller"); + return SQL_RESULT_NEXT_MORE; +} + static int driver_cassandra_result_next_row(struct sql_result *_result) { struct cassandra_result *result = (struct cassandra_result *)_result; @@ -1198,7 +1234,7 @@ return -1; if (!cass_iterator_next(result->iterator)) - return 0; + return driver_cassandra_result_next_page(result); result->row_count++; p_clear(result->row_pool); @@ -1217,6 +1253,45 @@ return ret; } +static void +driver_cassandra_result_more(struct sql_result **_result, bool async, + sql_query_callback_t *callback, void *context) +{ + struct cassandra_db *db = (struct cassandra_db *)(*_result)->db; + struct cassandra_result *new_result; + struct cassandra_result *old_result = + (struct cassandra_result *)*_result; + + /* Initialize the next page as a new sql_result */ + new_result = driver_cassandra_query_init(db, old_result->query, + CASSANDRA_QUERY_TYPE_READ_MORE, + callback, context); + + /* Preserve the statement and update its paging state */ + new_result->statement = old_result->statement; + old_result->statement = NULL; + cass_statement_set_paging_state(new_result->statement, + old_result->result); + + sql_result_unref(*_result); + *_result = NULL; + + if (async) + (void)driver_cassandra_send_query(new_result); + else { + i_assert(db->api.state == SQL_DB_STATE_IDLE); + driver_cassandra_sync_init(db); + (void)driver_cassandra_send_query(new_result); + if (new_result->result == NULL) { + db->io_pipe = io_loop_move_io(&db->io_pipe); + io_loop_run(db->ioloop); + } + driver_cassandra_sync_deinit(db); + + callback(&new_result->api, context); + } +} + static unsigned int driver_cassandra_result_get_fields_count(struct sql_result *_result) { @@ -1538,7 +1613,7 @@ driver_cassandra_result_find_field_value, driver_cassandra_result_get_values, driver_cassandra_result_get_error, - NULL, + driver_cassandra_result_more, } };