changeset 22530:5f7b8aa18c27

cassandra: Add support for prepared statements
author Timo Sirainen <timo.sirainen@dovecot.fi>
date Tue, 22 Aug 2017 14:35:11 +0300
parents 7b17f52b75b6
children 65df17ce8844
files src/lib-sql/driver-cassandra.c
diffstat 1 files changed, 469 insertions(+), 11 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib-sql/driver-cassandra.c	Tue Aug 22 13:55:15 2017 +0300
+++ b/src/lib-sql/driver-cassandra.c	Tue Aug 22 14:35:11 2017 +0300
@@ -99,6 +99,7 @@
 
 	int fd_pipe[2];
 	struct io *io_pipe;
+	ARRAY(struct cassandra_sql_prepared_statement *) pending_prepares;
 	ARRAY(struct cassandra_callback *) callbacks;
 	ARRAY(struct cassandra_result *) results;
 	unsigned int callback_ids;
@@ -149,6 +150,7 @@
 	sql_commit2_callback_t *callback;
 	void *context;
 
+	struct cassandra_sql_statement *stmt;
 	char *query;
 	char *error;
 
@@ -157,6 +159,42 @@
 	unsigned int failed:1;
 };
 
+struct cassandra_sql_arg {
+	unsigned int column_idx;
+
+	char *value_str;
+	unsigned char *value_binary;
+	size_t value_binary_size;
+	int64_t value_int64;
+};
+
+struct cassandra_sql_statement {
+	struct sql_statement stmt;
+
+	struct cassandra_sql_prepared_statement *prep;
+	CassStatement *cass_stmt;
+
+	ARRAY(struct cassandra_sql_arg) pending_args;
+	cass_int64_t pending_timestamp;
+
+	struct cassandra_result *result;
+};
+
+struct cassandra_sql_prepared_statement {
+	struct sql_prepared_statement prep_stmt;
+	char *query_template;
+
+	/* NULL, until the prepare is asynchronously finished */
+	const CassPrepared *prepared;
+	/* statements waiting for prepare to finish */
+	ARRAY(struct cassandra_sql_statement *) pending_statements;
+	/* an error here will cause the prepare to be retried on the next
+	   execution attempt. */
+	char *error;
+
+	bool pending;
+};
+
 extern const struct sql_db driver_cassandra_db;
 extern const struct sql_result driver_cassandra_result;
 
@@ -189,6 +227,9 @@
 	{ CASS_LOG_TRACE, "trace" }
 };
 
+static void driver_cassandra_prepare_pending(struct cassandra_db *db);
+static void
+prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt);
 static void driver_cassandra_result_send_query(struct cassandra_result *result);
 static void driver_cassandra_send_queries(struct cassandra_db *db);
 static void result_finish(struct cassandra_result *result);
@@ -232,6 +273,7 @@
 
 static void driver_cassandra_close(struct cassandra_db *db, const char *error)
 {
+	struct cassandra_sql_prepared_statement *const *prep_stmtp;
 	struct cassandra_result *const *resultp;
 
 	if (db->io_pipe != NULL)
@@ -242,6 +284,13 @@
 	}
 	driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
 
+	array_foreach(&db->pending_prepares, prep_stmtp) {
+		(*prep_stmtp)->pending = FALSE;
+		(*prep_stmtp)->error = i_strdup(error);
+		prepare_finish_pending_statements(*prep_stmtp);
+	}
+	array_clear(&db->pending_prepares);
+
 	while (array_count(&db->results) > 0) {
 		resultp = array_idx(&db->results, 0);
 		if ((*resultp)->error == NULL)
@@ -365,6 +414,7 @@
 		   finish */
 		io_loop_stop(db->ioloop);
 	}
+	driver_cassandra_prepare_pending(db);
 	driver_cassandra_send_queries(db);
 }
 
@@ -654,6 +704,7 @@
 		db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
 	i_array_init(&db->results, 16);
 	i_array_init(&db->callbacks, 16);
+	i_array_init(&db->pending_prepares, 16);
 	return &db->api;
 }
 
@@ -667,6 +718,8 @@
 	array_free(&db->callbacks);
 	i_assert(array_count(&db->results) == 0);
 	array_free(&db->results);
+	i_assert(array_count(&db->pending_prepares) == 0);
+	array_free(&db->pending_prepares);
 
 	cass_session_free(db->session);
 	cass_cluster_free(db->cluster);
@@ -925,6 +978,8 @@
 	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
 	CassFuture *future;
 
+	i_assert(result->statement != NULL);
+
 	db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
 	driver_cassandra_init_statement(result);
 
@@ -1023,7 +1078,7 @@
 
 	results = array_get(&db->results, &count);
 	for (i = 0; i < count; i++) {
-		if (!results[i]->query_sent) {
+		if (!results[i]->query_sent && results[i]->statement != NULL) {
 			if (driver_cassandra_send_query(results[i]) <= 0)
 				break;
 		}
@@ -1454,6 +1509,7 @@
 {
 	struct cassandra_transaction_context *ctx =
 		(struct cassandra_transaction_context *)_ctx;
+	struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
 	enum cassandra_query_type query_type;
 	struct sql_commit_result result;
 
@@ -1461,20 +1517,37 @@
 	ctx->callback = callback;
 	ctx->context = context;
 
-	if (ctx->failed || ctx->query == NULL) {
+	if (ctx->failed || (ctx->query == NULL && ctx->stmt == NULL)) {
 		if (ctx->failed)
 			result.error = ctx->error;
 
 		callback(&result, context);
 		driver_cassandra_transaction_unref(&ctx);
+		return;
+	}
+
+	/* just a single query, send it */
+	const char *query = ctx->query != NULL ?
+		ctx->query : sql_statement_get_query(&ctx->stmt->stmt);
+	if (strncasecmp(query, "DELETE ", 7) == 0)
+		query_type = CASSANDRA_QUERY_TYPE_DELETE;
+	else
+		query_type = CASSANDRA_QUERY_TYPE_WRITE;
+
+	if (ctx->query != NULL) {
+		driver_cassandra_query_full(_ctx->db, query, query_type,
+			  transaction_commit_callback, ctx);
 	} else {
-		/* just a single query, send it */
-		if (strncasecmp(ctx->query, "DELETE ", 7) == 0)
-			query_type = CASSANDRA_QUERY_TYPE_DELETE;
-		else
-			query_type = CASSANDRA_QUERY_TYPE_WRITE;
-		driver_cassandra_query_full(_ctx->db, ctx->query, query_type,
-			  transaction_commit_callback, ctx);
+		ctx->stmt->result =
+			driver_cassandra_query_init(db, query, query_type,
+				transaction_commit_callback, ctx);
+		if (ctx->stmt->cass_stmt == NULL) {
+			/* wait for prepare to finish */
+		} else {
+			ctx->stmt->result->statement = ctx->stmt->cass_stmt;
+			(void)driver_cassandra_send_query(ctx->stmt->result);
+			pool_unref(&ctx->stmt->stmt.pool);
+		}
 	}
 }
 
@@ -1507,6 +1580,11 @@
 	struct cassandra_transaction_context *ctx =
 		(struct cassandra_transaction_context *)_ctx;
 
+	if (ctx->stmt != NULL) {
+		/* nothing should be using this - don't bother implementing */
+		i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
+	}
+
 	if (ctx->query != NULL && !ctx->failed)
 		driver_cassandra_try_commit_s(ctx);
 	*error_r = t_strdup(ctx->error);
@@ -1536,7 +1614,7 @@
 
 	i_assert(affected_rows == NULL);
 
-	if (ctx->query != NULL) {
+	if (ctx->query != NULL || ctx->stmt != NULL) {
 		transaction_set_failed(ctx, "Multiple changes in transaction not supported");
 		return;
 	}
@@ -1554,9 +1632,376 @@
 	return str_c(str);
 }
 
+static CassError
+driver_cassandra_bind_int(struct cassandra_sql_statement *stmt,
+			  unsigned int column_idx, int64_t value)
+{
+	const CassDataType *data_type;
+	CassValueType value_type;
+
+	if (stmt->prep == NULL) {
+		value_type = value >= -2147483648 && value <= 2147483647 ?
+			CASS_VALUE_TYPE_INT : CASS_VALUE_TYPE_BIGINT;
+	} else {
+		/* prepared statements require exactly correct value type */
+		data_type = cass_prepared_parameter_data_type(stmt->prep->prepared, column_idx);
+		value_type = cass_data_type_type(data_type);
+	}
+
+	switch (value_type) {
+	case CASS_VALUE_TYPE_INT:
+		if (value < -2147483648 || value > 2147483647)
+			return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+		return cass_statement_bind_int32(stmt->cass_stmt, column_idx, value);
+	case CASS_VALUE_TYPE_BIGINT:
+		return cass_statement_bind_int64(stmt->cass_stmt, column_idx, value);
+	case CASS_VALUE_TYPE_SMALL_INT:
+		if (value < -32768 || value > 32767)
+			return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+		return cass_statement_bind_int16(stmt->cass_stmt, column_idx, value);
+	case CASS_VALUE_TYPE_TINY_INT:
+		if (value < -128 || value > 127)
+			return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+		return cass_statement_bind_int8(stmt->cass_stmt, column_idx, value);
+	default:
+		return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+	}
+}
+
+static void prepare_finish_arg(struct cassandra_sql_statement *stmt,
+			       const struct cassandra_sql_arg *arg)
+{
+	CassError rc;
+
+	if (arg->value_str != NULL) {
+		rc = cass_statement_bind_string(stmt->cass_stmt, arg->column_idx,
+						arg->value_str);
+	} else if (arg->value_binary != NULL) {
+		rc = cass_statement_bind_bytes(stmt->cass_stmt, arg->column_idx,
+					       arg->value_binary,
+					       arg->value_binary_size);
+	} else {
+		rc = driver_cassandra_bind_int(stmt, arg->column_idx,
+					       arg->value_int64);
+	}
+	if (rc != CASS_OK) {
+		i_error("cassandra: Statement '%s': Failed to bind column %u: %s",
+			stmt->stmt.query_template, arg->column_idx,
+			cass_error_desc(rc));
+	}
+}
+
+static void prepare_finish_statement(struct cassandra_sql_statement *stmt)
+{
+	const struct cassandra_sql_arg *arg;
+
+	if (stmt->prep->prepared == NULL) {
+		i_assert(stmt->prep->error != NULL);
+
+		if (stmt->result != NULL) {
+			stmt->result->error = i_strdup(stmt->prep->error);
+			result_finish(stmt->result);
+		}
+		return;
+	}
+	stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared);
+
+	if (stmt->pending_timestamp != 0) {
+		cass_statement_set_timestamp(stmt->cass_stmt,
+					     stmt->pending_timestamp);
+	}
+
+	if (array_is_created(&stmt->pending_args)) {
+		array_foreach(&stmt->pending_args, arg)
+			prepare_finish_arg(stmt, arg);
+	}
+	if (stmt->result != NULL) {
+		stmt->result->statement = stmt->cass_stmt;
+		(void)driver_cassandra_send_query(stmt->result);
+		pool_unref(&stmt->stmt.pool);
+	}
+}
+
+static void
+prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt)
+{
+	struct cassandra_sql_statement *const *stmtp;
+
+	array_foreach(&prep_stmt->pending_statements, stmtp)
+		prepare_finish_statement(*stmtp);
+	array_clear(&prep_stmt->pending_statements);
+}
+
+static void prepare_callback(CassFuture *future, void *context)
+{
+	struct cassandra_sql_prepared_statement *prep_stmt = context;
+	CassError error = cass_future_error_code(future);
+
+	if (error != CASS_OK) {
+		const char *errmsg;
+		size_t errsize;
+
+		cass_future_error_message(future, &errmsg, &errsize);
+		i_free(prep_stmt->error);
+		prep_stmt->error = i_strndup(errmsg, errsize);
+	} else {
+		prep_stmt->prepared = cass_future_get_prepared(future);
+	}
+
+	prepare_finish_pending_statements(prep_stmt);
+}
+
+static void prepare_start(struct cassandra_sql_prepared_statement *prep_stmt)
+{
+	struct cassandra_db *db = (struct cassandra_db *)prep_stmt->prep_stmt.db;
+	CassFuture *future;
+
+	if (!SQL_DB_IS_READY(&db->api)) {
+		if (!prep_stmt->pending) {
+			prep_stmt->pending = TRUE;
+			array_append(&db->pending_prepares, &prep_stmt, 1);
+
+			if (sql_connect(&db->api) < 0)
+				i_unreached();
+		}
+		return;
+	}
+
+	/* clear the current error in case we're retrying */
+	i_free_and_null(prep_stmt->error);
+
+	future = cass_session_prepare(db->session, prep_stmt->query_template);
+	driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt);
+}
+
+static void driver_cassandra_prepare_pending(struct cassandra_db *db)
+{
+	struct cassandra_sql_prepared_statement *const *prep_stmtp;
+
+	i_assert(SQL_DB_IS_READY(&db->api));
+
+	array_foreach(&db->pending_prepares, prep_stmtp) {
+		(*prep_stmtp)->pending = FALSE;
+		prepare_start(*prep_stmtp);
+	}
+	array_clear(&db->pending_prepares);
+}
+
+static struct sql_prepared_statement *
+driver_cassandra_prepared_statement_init(struct sql_db *db,
+					 const char *query_template)
+{
+	struct cassandra_sql_prepared_statement *prep_stmt =
+		i_new(struct cassandra_sql_prepared_statement, 1);
+	prep_stmt->prep_stmt.db = db;
+	prep_stmt->query_template = i_strdup(query_template);
+	i_array_init(&prep_stmt->pending_statements, 4);
+	prepare_start(prep_stmt);
+	return &prep_stmt->prep_stmt;
+}
+
+static void
+driver_cassandra_prepared_statement_deinit(struct sql_prepared_statement *_prep_stmt)
+{
+	struct cassandra_sql_prepared_statement *prep_stmt =
+		(struct cassandra_sql_prepared_statement *)_prep_stmt;
+
+	i_assert(array_count(&prep_stmt->pending_statements) == 0);
+	if (prep_stmt->prepared != NULL)
+		cass_prepared_free(prep_stmt->prepared);
+	array_free(&prep_stmt->pending_statements);
+	i_free(prep_stmt->query_template);
+	i_free(prep_stmt->error);
+	i_free(prep_stmt);
+}
+
+static struct sql_statement *
+driver_cassandra_statement_init(struct sql_db *db ATTR_UNUSED,
+				const char *query_template)
+{
+	pool_t pool = pool_alloconly_create("cassandra sql statement", 1024);
+	struct cassandra_sql_statement *stmt =
+		p_new(pool, struct cassandra_sql_statement, 1);
+
+	/* Count the number of parameters in the query. We'll assume that all
+	   the changing parameters are bound, so there shouldn't be any
+	   quoted strings with '?' in them. */
+	const char *p = query_template;
+	size_t param_count = 0;
+	while ((p = strchr(p, '?')) != NULL) {
+		param_count++;
+		p++;
+	}
+
+	stmt->stmt.pool = pool;
+	stmt->cass_stmt = cass_statement_new(query_template, param_count);
+	return &stmt->stmt;
+}
+
+static struct sql_statement *
+driver_cassandra_statement_init_prepared(struct sql_prepared_statement *_prep_stmt)
+{
+	struct cassandra_sql_prepared_statement *prep_stmt =
+		(struct cassandra_sql_prepared_statement *)_prep_stmt;
+	pool_t pool = pool_alloconly_create("cassandra prepared sql statement", 1024);
+	struct cassandra_sql_statement *stmt =
+		p_new(pool, struct cassandra_sql_statement, 1);
+
+	stmt->stmt.pool = pool;
+	stmt->stmt.query_template =
+		p_strdup(stmt->stmt.pool, prep_stmt->query_template);
+	stmt->prep = prep_stmt;
+
+	if (prep_stmt->prepared != NULL) {
+		/* statement is already prepared. we can use it immediately. */
+		stmt->cass_stmt = cass_prepared_bind(prep_stmt->prepared);
+	} else {
+		if (prep_stmt->error != NULL)
+			prepare_start(prep_stmt);
+		/* need to wait until prepare is finished */
+		array_append(&prep_stmt->pending_statements, &stmt, 1);
+	}
+	return &stmt->stmt;
+}
+
+static void
+driver_cassandra_statement_abort(struct sql_statement *_stmt)
+{
+	struct cassandra_sql_statement *stmt =
+		(struct cassandra_sql_statement *)_stmt;
+
+	if (stmt->cass_stmt != NULL)
+		cass_statement_free(stmt->cass_stmt);
+}
+
+static void
+driver_cassandra_statement_set_timestamp(struct sql_statement *_stmt,
+					 const struct timespec *ts)
+{
+	struct cassandra_sql_statement *stmt =
+		(struct cassandra_sql_statement *)_stmt;
+	cass_int64_t ts_msecs =
+		(cass_int64_t)ts->tv_sec * 1000 +
+		ts->tv_nsec / 1000000;
+
+	if (stmt->cass_stmt != NULL)
+		cass_statement_set_timestamp(stmt->cass_stmt, ts_msecs);
+	else
+		stmt->pending_timestamp = ts_msecs;
+}
+
+static struct cassandra_sql_arg *
+driver_cassandra_add_pending_arg(struct cassandra_sql_statement *stmt,
+				 unsigned int column_idx)
+{
+	struct cassandra_sql_arg *arg;
+
+	if (!array_is_created(&stmt->pending_args))
+		p_array_init(&stmt->pending_args, stmt->stmt.pool, 8);
+	arg = array_append_space(&stmt->pending_args);
+	arg->column_idx = column_idx;
+	return arg;
+}
+
+static void
+driver_cassandra_statement_bind_str(struct sql_statement *_stmt,
+				    unsigned int column_idx,
+				    const char *value)
+{
+	struct cassandra_sql_statement *stmt =
+		(struct cassandra_sql_statement *)_stmt;
+	if (stmt->cass_stmt != NULL)
+		cass_statement_bind_string(stmt->cass_stmt, column_idx, value);
+	else {
+		struct cassandra_sql_arg *arg =
+			driver_cassandra_add_pending_arg(stmt, column_idx);
+		arg->value_str = p_strdup(_stmt->pool, value);
+	}
+}
+
+static void
+driver_cassandra_statement_bind_binary(struct sql_statement *_stmt,
+				       unsigned int column_idx,
+				       const void *value, size_t value_size)
+{
+	struct cassandra_sql_statement *stmt =
+		(struct cassandra_sql_statement *)_stmt;
+
+	if (stmt->cass_stmt != NULL) {
+		cass_statement_bind_bytes(stmt->cass_stmt, column_idx,
+					  value, value_size);
+	} else {
+		struct cassandra_sql_arg *arg =
+			driver_cassandra_add_pending_arg(stmt, column_idx);
+		arg->value_binary = p_memdup(_stmt->pool, value, value_size);
+		arg->value_binary_size = value_size;
+	}
+}
+
+static void
+driver_cassandra_statement_bind_int64(struct sql_statement *_stmt,
+				      unsigned int column_idx, int64_t value)
+{
+	struct cassandra_sql_statement *stmt =
+		(struct cassandra_sql_statement *)_stmt;
+
+	if (stmt->cass_stmt != NULL)
+		driver_cassandra_bind_int(stmt, column_idx, value);
+	else {
+		struct cassandra_sql_arg *arg =
+			driver_cassandra_add_pending_arg(stmt, column_idx);
+		arg->value_int64 = value;
+	}
+}
+
+static void
+driver_cassandra_statement_query(struct sql_statement *_stmt,
+				 sql_query_callback_t *callback, void *context)
+{
+	struct cassandra_sql_statement *stmt =
+		(struct cassandra_sql_statement *)_stmt;
+	struct cassandra_db *db = (struct cassandra_db *)_stmt->db;
+
+	stmt->result = driver_cassandra_query_init(db, sql_statement_get_query(_stmt),
+						   CASSANDRA_QUERY_TYPE_READ,
+						   callback, context);
+	if (stmt->cass_stmt == NULL) {
+		/* wait for prepare to finish */
+	} else {
+		stmt->result->statement = stmt->cass_stmt;
+		(void)driver_cassandra_send_query(stmt->result);
+		pool_unref(&_stmt->pool);
+	}
+}
+
+static struct sql_result *
+driver_cassandra_statement_query_s(struct sql_statement *_stmt ATTR_UNUSED)
+{
+	i_panic("cassandra: sql_statement_query_s() not supported");
+}
+
+static void
+driver_cassandra_update_stmt(struct sql_transaction_context *_ctx,
+			     struct sql_statement *_stmt,
+			     unsigned int *affected_rows)
+{
+	struct cassandra_transaction_context *ctx =
+		(struct cassandra_transaction_context *)_ctx;
+	struct cassandra_sql_statement *stmt =
+		(struct cassandra_sql_statement *)_stmt;
+
+	i_assert(affected_rows == NULL);
+
+	if (ctx->query != NULL || ctx->stmt != NULL) {
+		transaction_set_failed(ctx, "Multiple changes in transaction not supported");
+		return;
+	}
+	ctx->stmt = stmt;
+}
+
 const struct sql_db driver_cassandra_db = {
 	.name = "cassandra",
-	.flags = 0,
+	.flags = SQL_DB_FLAG_PREP_STATEMENTS,
 
 	.v = {
 		.init = driver_cassandra_init_v,
@@ -1576,6 +2021,19 @@
 		.update = driver_cassandra_update,
 
 		.escape_blob = driver_cassandra_escape_blob,
+
+		.prepared_statement_init = driver_cassandra_prepared_statement_init,
+		.prepared_statement_deinit = driver_cassandra_prepared_statement_deinit,
+		.statement_init = driver_cassandra_statement_init,
+		.statement_init_prepared = driver_cassandra_statement_init_prepared,
+		.statement_abort = driver_cassandra_statement_abort,
+		.statement_set_timestamp = driver_cassandra_statement_set_timestamp,
+		.statement_bind_str = driver_cassandra_statement_bind_str,
+		.statement_bind_binary = driver_cassandra_statement_bind_binary,
+		.statement_bind_int64 = driver_cassandra_statement_bind_int64,
+		.statement_query = driver_cassandra_statement_query,
+		.statement_query_s = driver_cassandra_statement_query_s,
+		.update_stmt = driver_cassandra_update_stmt,
 	}
 };