# HG changeset patch # User Timo Sirainen # Date 1503401711 -10800 # Node ID 5f7b8aa18c274cd54edfcce5e0a69428ee20fa5e # Parent 7b17f52b75b69f81a2079648685ffcd7b4c29660 cassandra: Add support for prepared statements diff -r 7b17f52b75b6 -r 5f7b8aa18c27 src/lib-sql/driver-cassandra.c --- a/src/lib-sql/driver-cassandra.c Tue Aug 22 13:55:15 2017 +0300 +++ b/src/lib-sql/driver-cassandra.c Tue Aug 22 14:35:11 2017 +0300 @@ -99,6 +99,7 @@ int fd_pipe[2]; struct io *io_pipe; + ARRAY(struct cassandra_sql_prepared_statement *) pending_prepares; ARRAY(struct cassandra_callback *) callbacks; ARRAY(struct cassandra_result *) results; unsigned int callback_ids; @@ -149,6 +150,7 @@ sql_commit2_callback_t *callback; void *context; + struct cassandra_sql_statement *stmt; char *query; char *error; @@ -157,6 +159,42 @@ unsigned int failed:1; }; +struct cassandra_sql_arg { + unsigned int column_idx; + + char *value_str; + unsigned char *value_binary; + size_t value_binary_size; + int64_t value_int64; +}; + +struct cassandra_sql_statement { + struct sql_statement stmt; + + struct cassandra_sql_prepared_statement *prep; + CassStatement *cass_stmt; + + ARRAY(struct cassandra_sql_arg) pending_args; + cass_int64_t pending_timestamp; + + struct cassandra_result *result; +}; + +struct cassandra_sql_prepared_statement { + struct sql_prepared_statement prep_stmt; + char *query_template; + + /* NULL, until the prepare is asynchronously finished */ + const CassPrepared *prepared; + /* statements waiting for prepare to finish */ + ARRAY(struct cassandra_sql_statement *) pending_statements; + /* an error here will cause the prepare to be retried on the next + execution attempt. */ + char *error; + + bool pending; +}; + extern const struct sql_db driver_cassandra_db; extern const struct sql_result driver_cassandra_result; @@ -189,6 +227,9 @@ { CASS_LOG_TRACE, "trace" } }; +static void driver_cassandra_prepare_pending(struct cassandra_db *db); +static void +prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt); static void driver_cassandra_result_send_query(struct cassandra_result *result); static void driver_cassandra_send_queries(struct cassandra_db *db); static void result_finish(struct cassandra_result *result); @@ -232,6 +273,7 @@ static void driver_cassandra_close(struct cassandra_db *db, const char *error) { + struct cassandra_sql_prepared_statement *const *prep_stmtp; struct cassandra_result *const *resultp; if (db->io_pipe != NULL) @@ -242,6 +284,13 @@ } driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED); + array_foreach(&db->pending_prepares, prep_stmtp) { + (*prep_stmtp)->pending = FALSE; + (*prep_stmtp)->error = i_strdup(error); + prepare_finish_pending_statements(*prep_stmtp); + } + array_clear(&db->pending_prepares); + while (array_count(&db->results) > 0) { resultp = array_idx(&db->results, 0); if ((*resultp)->error == NULL) @@ -365,6 +414,7 @@ finish */ io_loop_stop(db->ioloop); } + driver_cassandra_prepare_pending(db); driver_cassandra_send_queries(db); } @@ -654,6 +704,7 @@ db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db); i_array_init(&db->results, 16); i_array_init(&db->callbacks, 16); + i_array_init(&db->pending_prepares, 16); return &db->api; } @@ -667,6 +718,8 @@ array_free(&db->callbacks); i_assert(array_count(&db->results) == 0); array_free(&db->results); + i_assert(array_count(&db->pending_prepares) == 0); + array_free(&db->pending_prepares); cass_session_free(db->session); cass_cluster_free(db->cluster); @@ -925,6 +978,8 @@ struct cassandra_db *db = (struct cassandra_db *)result->api.db; CassFuture *future; + i_assert(result->statement != NULL); + db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++; driver_cassandra_init_statement(result); @@ -1023,7 +1078,7 @@ results = array_get(&db->results, &count); for (i = 0; i < count; i++) { - if (!results[i]->query_sent) { + if (!results[i]->query_sent && results[i]->statement != NULL) { if (driver_cassandra_send_query(results[i]) <= 0) break; } @@ -1454,6 +1509,7 @@ { struct cassandra_transaction_context *ctx = (struct cassandra_transaction_context *)_ctx; + struct cassandra_db *db = (struct cassandra_db *)_ctx->db; enum cassandra_query_type query_type; struct sql_commit_result result; @@ -1461,20 +1517,37 @@ ctx->callback = callback; ctx->context = context; - if (ctx->failed || ctx->query == NULL) { + if (ctx->failed || (ctx->query == NULL && ctx->stmt == NULL)) { if (ctx->failed) result.error = ctx->error; callback(&result, context); driver_cassandra_transaction_unref(&ctx); + return; + } + + /* just a single query, send it */ + const char *query = ctx->query != NULL ? + ctx->query : sql_statement_get_query(&ctx->stmt->stmt); + if (strncasecmp(query, "DELETE ", 7) == 0) + query_type = CASSANDRA_QUERY_TYPE_DELETE; + else + query_type = CASSANDRA_QUERY_TYPE_WRITE; + + if (ctx->query != NULL) { + driver_cassandra_query_full(_ctx->db, query, query_type, + transaction_commit_callback, ctx); } else { - /* just a single query, send it */ - if (strncasecmp(ctx->query, "DELETE ", 7) == 0) - query_type = CASSANDRA_QUERY_TYPE_DELETE; - else - query_type = CASSANDRA_QUERY_TYPE_WRITE; - driver_cassandra_query_full(_ctx->db, ctx->query, query_type, - transaction_commit_callback, ctx); + ctx->stmt->result = + driver_cassandra_query_init(db, query, query_type, + transaction_commit_callback, ctx); + if (ctx->stmt->cass_stmt == NULL) { + /* wait for prepare to finish */ + } else { + ctx->stmt->result->statement = ctx->stmt->cass_stmt; + (void)driver_cassandra_send_query(ctx->stmt->result); + pool_unref(&ctx->stmt->stmt.pool); + } } } @@ -1507,6 +1580,11 @@ struct cassandra_transaction_context *ctx = (struct cassandra_transaction_context *)_ctx; + if (ctx->stmt != NULL) { + /* nothing should be using this - don't bother implementing */ + i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements"); + } + if (ctx->query != NULL && !ctx->failed) driver_cassandra_try_commit_s(ctx); *error_r = t_strdup(ctx->error); @@ -1536,7 +1614,7 @@ i_assert(affected_rows == NULL); - if (ctx->query != NULL) { + if (ctx->query != NULL || ctx->stmt != NULL) { transaction_set_failed(ctx, "Multiple changes in transaction not supported"); return; } @@ -1554,9 +1632,376 @@ return str_c(str); } +static CassError +driver_cassandra_bind_int(struct cassandra_sql_statement *stmt, + unsigned int column_idx, int64_t value) +{ + const CassDataType *data_type; + CassValueType value_type; + + if (stmt->prep == NULL) { + value_type = value >= -2147483648 && value <= 2147483647 ? + CASS_VALUE_TYPE_INT : CASS_VALUE_TYPE_BIGINT; + } else { + /* prepared statements require exactly correct value type */ + data_type = cass_prepared_parameter_data_type(stmt->prep->prepared, column_idx); + value_type = cass_data_type_type(data_type); + } + + switch (value_type) { + case CASS_VALUE_TYPE_INT: + if (value < -2147483648 || value > 2147483647) + return CASS_ERROR_LIB_INVALID_VALUE_TYPE; + return cass_statement_bind_int32(stmt->cass_stmt, column_idx, value); + case CASS_VALUE_TYPE_BIGINT: + return cass_statement_bind_int64(stmt->cass_stmt, column_idx, value); + case CASS_VALUE_TYPE_SMALL_INT: + if (value < -32768 || value > 32767) + return CASS_ERROR_LIB_INVALID_VALUE_TYPE; + return cass_statement_bind_int16(stmt->cass_stmt, column_idx, value); + case CASS_VALUE_TYPE_TINY_INT: + if (value < -128 || value > 127) + return CASS_ERROR_LIB_INVALID_VALUE_TYPE; + return cass_statement_bind_int8(stmt->cass_stmt, column_idx, value); + default: + return CASS_ERROR_LIB_INVALID_VALUE_TYPE; + } +} + +static void prepare_finish_arg(struct cassandra_sql_statement *stmt, + const struct cassandra_sql_arg *arg) +{ + CassError rc; + + if (arg->value_str != NULL) { + rc = cass_statement_bind_string(stmt->cass_stmt, arg->column_idx, + arg->value_str); + } else if (arg->value_binary != NULL) { + rc = cass_statement_bind_bytes(stmt->cass_stmt, arg->column_idx, + arg->value_binary, + arg->value_binary_size); + } else { + rc = driver_cassandra_bind_int(stmt, arg->column_idx, + arg->value_int64); + } + if (rc != CASS_OK) { + i_error("cassandra: Statement '%s': Failed to bind column %u: %s", + stmt->stmt.query_template, arg->column_idx, + cass_error_desc(rc)); + } +} + +static void prepare_finish_statement(struct cassandra_sql_statement *stmt) +{ + const struct cassandra_sql_arg *arg; + + if (stmt->prep->prepared == NULL) { + i_assert(stmt->prep->error != NULL); + + if (stmt->result != NULL) { + stmt->result->error = i_strdup(stmt->prep->error); + result_finish(stmt->result); + } + return; + } + stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared); + + if (stmt->pending_timestamp != 0) { + cass_statement_set_timestamp(stmt->cass_stmt, + stmt->pending_timestamp); + } + + if (array_is_created(&stmt->pending_args)) { + array_foreach(&stmt->pending_args, arg) + prepare_finish_arg(stmt, arg); + } + if (stmt->result != NULL) { + stmt->result->statement = stmt->cass_stmt; + (void)driver_cassandra_send_query(stmt->result); + pool_unref(&stmt->stmt.pool); + } +} + +static void +prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt) +{ + struct cassandra_sql_statement *const *stmtp; + + array_foreach(&prep_stmt->pending_statements, stmtp) + prepare_finish_statement(*stmtp); + array_clear(&prep_stmt->pending_statements); +} + +static void prepare_callback(CassFuture *future, void *context) +{ + struct cassandra_sql_prepared_statement *prep_stmt = context; + CassError error = cass_future_error_code(future); + + if (error != CASS_OK) { + const char *errmsg; + size_t errsize; + + cass_future_error_message(future, &errmsg, &errsize); + i_free(prep_stmt->error); + prep_stmt->error = i_strndup(errmsg, errsize); + } else { + prep_stmt->prepared = cass_future_get_prepared(future); + } + + prepare_finish_pending_statements(prep_stmt); +} + +static void prepare_start(struct cassandra_sql_prepared_statement *prep_stmt) +{ + struct cassandra_db *db = (struct cassandra_db *)prep_stmt->prep_stmt.db; + CassFuture *future; + + if (!SQL_DB_IS_READY(&db->api)) { + if (!prep_stmt->pending) { + prep_stmt->pending = TRUE; + array_append(&db->pending_prepares, &prep_stmt, 1); + + if (sql_connect(&db->api) < 0) + i_unreached(); + } + return; + } + + /* clear the current error in case we're retrying */ + i_free_and_null(prep_stmt->error); + + future = cass_session_prepare(db->session, prep_stmt->query_template); + driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt); +} + +static void driver_cassandra_prepare_pending(struct cassandra_db *db) +{ + struct cassandra_sql_prepared_statement *const *prep_stmtp; + + i_assert(SQL_DB_IS_READY(&db->api)); + + array_foreach(&db->pending_prepares, prep_stmtp) { + (*prep_stmtp)->pending = FALSE; + prepare_start(*prep_stmtp); + } + array_clear(&db->pending_prepares); +} + +static struct sql_prepared_statement * +driver_cassandra_prepared_statement_init(struct sql_db *db, + const char *query_template) +{ + struct cassandra_sql_prepared_statement *prep_stmt = + i_new(struct cassandra_sql_prepared_statement, 1); + prep_stmt->prep_stmt.db = db; + prep_stmt->query_template = i_strdup(query_template); + i_array_init(&prep_stmt->pending_statements, 4); + prepare_start(prep_stmt); + return &prep_stmt->prep_stmt; +} + +static void +driver_cassandra_prepared_statement_deinit(struct sql_prepared_statement *_prep_stmt) +{ + struct cassandra_sql_prepared_statement *prep_stmt = + (struct cassandra_sql_prepared_statement *)_prep_stmt; + + i_assert(array_count(&prep_stmt->pending_statements) == 0); + if (prep_stmt->prepared != NULL) + cass_prepared_free(prep_stmt->prepared); + array_free(&prep_stmt->pending_statements); + i_free(prep_stmt->query_template); + i_free(prep_stmt->error); + i_free(prep_stmt); +} + +static struct sql_statement * +driver_cassandra_statement_init(struct sql_db *db ATTR_UNUSED, + const char *query_template) +{ + pool_t pool = pool_alloconly_create("cassandra sql statement", 1024); + struct cassandra_sql_statement *stmt = + p_new(pool, struct cassandra_sql_statement, 1); + + /* Count the number of parameters in the query. We'll assume that all + the changing parameters are bound, so there shouldn't be any + quoted strings with '?' in them. */ + const char *p = query_template; + size_t param_count = 0; + while ((p = strchr(p, '?')) != NULL) { + param_count++; + p++; + } + + stmt->stmt.pool = pool; + stmt->cass_stmt = cass_statement_new(query_template, param_count); + return &stmt->stmt; +} + +static struct sql_statement * +driver_cassandra_statement_init_prepared(struct sql_prepared_statement *_prep_stmt) +{ + struct cassandra_sql_prepared_statement *prep_stmt = + (struct cassandra_sql_prepared_statement *)_prep_stmt; + pool_t pool = pool_alloconly_create("cassandra prepared sql statement", 1024); + struct cassandra_sql_statement *stmt = + p_new(pool, struct cassandra_sql_statement, 1); + + stmt->stmt.pool = pool; + stmt->stmt.query_template = + p_strdup(stmt->stmt.pool, prep_stmt->query_template); + stmt->prep = prep_stmt; + + if (prep_stmt->prepared != NULL) { + /* statement is already prepared. we can use it immediately. */ + stmt->cass_stmt = cass_prepared_bind(prep_stmt->prepared); + } else { + if (prep_stmt->error != NULL) + prepare_start(prep_stmt); + /* need to wait until prepare is finished */ + array_append(&prep_stmt->pending_statements, &stmt, 1); + } + return &stmt->stmt; +} + +static void +driver_cassandra_statement_abort(struct sql_statement *_stmt) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + + if (stmt->cass_stmt != NULL) + cass_statement_free(stmt->cass_stmt); +} + +static void +driver_cassandra_statement_set_timestamp(struct sql_statement *_stmt, + const struct timespec *ts) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + cass_int64_t ts_msecs = + (cass_int64_t)ts->tv_sec * 1000 + + ts->tv_nsec / 1000000; + + if (stmt->cass_stmt != NULL) + cass_statement_set_timestamp(stmt->cass_stmt, ts_msecs); + else + stmt->pending_timestamp = ts_msecs; +} + +static struct cassandra_sql_arg * +driver_cassandra_add_pending_arg(struct cassandra_sql_statement *stmt, + unsigned int column_idx) +{ + struct cassandra_sql_arg *arg; + + if (!array_is_created(&stmt->pending_args)) + p_array_init(&stmt->pending_args, stmt->stmt.pool, 8); + arg = array_append_space(&stmt->pending_args); + arg->column_idx = column_idx; + return arg; +} + +static void +driver_cassandra_statement_bind_str(struct sql_statement *_stmt, + unsigned int column_idx, + const char *value) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + if (stmt->cass_stmt != NULL) + cass_statement_bind_string(stmt->cass_stmt, column_idx, value); + else { + struct cassandra_sql_arg *arg = + driver_cassandra_add_pending_arg(stmt, column_idx); + arg->value_str = p_strdup(_stmt->pool, value); + } +} + +static void +driver_cassandra_statement_bind_binary(struct sql_statement *_stmt, + unsigned int column_idx, + const void *value, size_t value_size) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + + if (stmt->cass_stmt != NULL) { + cass_statement_bind_bytes(stmt->cass_stmt, column_idx, + value, value_size); + } else { + struct cassandra_sql_arg *arg = + driver_cassandra_add_pending_arg(stmt, column_idx); + arg->value_binary = p_memdup(_stmt->pool, value, value_size); + arg->value_binary_size = value_size; + } +} + +static void +driver_cassandra_statement_bind_int64(struct sql_statement *_stmt, + unsigned int column_idx, int64_t value) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + + if (stmt->cass_stmt != NULL) + driver_cassandra_bind_int(stmt, column_idx, value); + else { + struct cassandra_sql_arg *arg = + driver_cassandra_add_pending_arg(stmt, column_idx); + arg->value_int64 = value; + } +} + +static void +driver_cassandra_statement_query(struct sql_statement *_stmt, + sql_query_callback_t *callback, void *context) +{ + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + struct cassandra_db *db = (struct cassandra_db *)_stmt->db; + + stmt->result = driver_cassandra_query_init(db, sql_statement_get_query(_stmt), + CASSANDRA_QUERY_TYPE_READ, + callback, context); + if (stmt->cass_stmt == NULL) { + /* wait for prepare to finish */ + } else { + stmt->result->statement = stmt->cass_stmt; + (void)driver_cassandra_send_query(stmt->result); + pool_unref(&_stmt->pool); + } +} + +static struct sql_result * +driver_cassandra_statement_query_s(struct sql_statement *_stmt ATTR_UNUSED) +{ + i_panic("cassandra: sql_statement_query_s() not supported"); +} + +static void +driver_cassandra_update_stmt(struct sql_transaction_context *_ctx, + struct sql_statement *_stmt, + unsigned int *affected_rows) +{ + struct cassandra_transaction_context *ctx = + (struct cassandra_transaction_context *)_ctx; + struct cassandra_sql_statement *stmt = + (struct cassandra_sql_statement *)_stmt; + + i_assert(affected_rows == NULL); + + if (ctx->query != NULL || ctx->stmt != NULL) { + transaction_set_failed(ctx, "Multiple changes in transaction not supported"); + return; + } + ctx->stmt = stmt; +} + const struct sql_db driver_cassandra_db = { .name = "cassandra", - .flags = 0, + .flags = SQL_DB_FLAG_PREP_STATEMENTS, .v = { .init = driver_cassandra_init_v, @@ -1576,6 +2021,19 @@ .update = driver_cassandra_update, .escape_blob = driver_cassandra_escape_blob, + + .prepared_statement_init = driver_cassandra_prepared_statement_init, + .prepared_statement_deinit = driver_cassandra_prepared_statement_deinit, + .statement_init = driver_cassandra_statement_init, + .statement_init_prepared = driver_cassandra_statement_init_prepared, + .statement_abort = driver_cassandra_statement_abort, + .statement_set_timestamp = driver_cassandra_statement_set_timestamp, + .statement_bind_str = driver_cassandra_statement_bind_str, + .statement_bind_binary = driver_cassandra_statement_bind_binary, + .statement_bind_int64 = driver_cassandra_statement_bind_int64, + .statement_query = driver_cassandra_statement_query, + .statement_query_s = driver_cassandra_statement_query_s, + .update_stmt = driver_cassandra_update_stmt, } };