changeset 21381:f22040e335f9

dict-sql: Support transaction timestamps with Cassandra driver
author Timo Sirainen <timo.sirainen@dovecot.fi>
date Mon, 09 Jan 2017 17:05:14 +0200
parents 0082213bce13
children 2229662b26a6
files src/lib-dict/dict-sql.c
diffstat 1 files changed, 31 insertions(+), 7 deletions(-) [+]
line wrap: on
line diff
--- a/src/lib-dict/dict-sql.c	Mon Jan 09 19:16:04 2017 +0200
+++ b/src/lib-dict/dict-sql.c	Mon Jan 09 17:05:14 2017 +0200
@@ -31,6 +31,7 @@
 	const struct dict_sql_settings *set;
 
 	unsigned int has_on_duplicate_key:1;
+	unsigned int has_using_timestamp:1;
 };
 
 struct sql_dict_iterate_context {
@@ -96,6 +97,8 @@
 
 	/* currently pgsql and sqlite don't support "ON DUPLICATE KEY" */
 	dict->has_on_duplicate_key = strcmp(driver->name, "mysql") == 0;
+	/* only Cassandra CQL supports "USING TIMESTAMP" */
+	dict->has_using_timestamp = strcmp(driver->name, "cassandra") == 0;
 
 	dict->db = sql_db_cache_new(dict_sql_db_cache, driver->name,
 				    dict->set->connect);
@@ -890,6 +893,21 @@
 	sql_dict_transaction_free(ctx);
 }
 
+static void
+sql_dict_transaction_add_timestamp(struct sql_dict_transaction_context *ctx,
+				   string_t *query)
+{
+	struct sql_dict *dict = (struct sql_dict *)ctx->ctx.dict;
+	unsigned long long timestamp_usecs;
+
+	if (ctx->ctx.timestamp.tv_sec == 0 || !dict->has_using_timestamp)
+		return;
+
+	timestamp_usecs = ctx->ctx.timestamp.tv_sec * 1000000ULL +
+		ctx->ctx.timestamp.tv_nsec / 1000;
+	str_printfa(query, " USING TIMESTAMP %llu", timestamp_usecs);
+}
+
 struct dict_sql_build_query_field {
 	const struct dict_sql_map *map;
 	const char *value;
@@ -904,7 +922,8 @@
 	bool inc;
 };
 
-static int sql_dict_set_query(const struct dict_sql_build_query *build,
+static int sql_dict_set_query(struct sql_dict_transaction_context *ctx,
+			      const struct dict_sql_build_query *build,
 			      const char **query_r, const char **error_r)
 {
 	struct sql_dict *dict = build->dict;
@@ -919,7 +938,9 @@
 
 	prefix = t_str_new(64);
 	suffix = t_str_new(256);
-	str_printfa(prefix, "INSERT INTO %s (", fields[0].map->table);
+	str_printfa(prefix, "INSERT INTO %s", fields[0].map->table);
+	sql_dict_transaction_add_timestamp(ctx, prefix);
+	str_append(prefix, " (");
 	str_append(suffix, ") VALUES (");
 	for (i = 0; i < field_count; i++) {
 		if (i > 0) {
@@ -987,7 +1008,8 @@
 }
 
 static int
-sql_dict_update_query(const struct dict_sql_build_query *build,
+sql_dict_update_query(struct sql_dict_transaction_context *ctx,
+		      const struct dict_sql_build_query *build,
 		      const char **query_r, const char **error_r)
 {
 	struct sql_dict *dict = build->dict;
@@ -1001,7 +1023,9 @@
 	i_assert(field_count > 0);
 
 	query = t_str_new(64);
-	str_printfa(query, "UPDATE %s SET ", fields[0].map->table);
+	str_printfa(query, "UPDATE %s", fields[0].map->table);
+	sql_dict_transaction_add_timestamp(ctx, query);
+	str_append(query, " SET ");
 	for (i = 0; i < field_count; i++) {
 		if (i > 0)
 			str_append_c(query, ',');
@@ -1053,7 +1077,7 @@
 		build.extra_values = &values;
 		build.key1 = key[0];
 
-		if (sql_dict_set_query(&build, &query, &error) < 0) {
+		if (sql_dict_set_query(ctx, &build, &query, &error) < 0) {
 			i_error("dict-sql: Failed to set %s=%s: %s",
 				key, value, error);
 			ctx->failed = TRUE;
@@ -1150,7 +1174,7 @@
 		build.key1 = key[0];
 		build.inc = TRUE;
 
-		if (sql_dict_update_query(&build, &query, &error) < 0) {
+		if (sql_dict_update_query(ctx, &build, &query, &error) < 0) {
 			i_error("dict-sql: Failed to increase %s: %s", key, error);
 			ctx->failed = TRUE;
 		} else {
@@ -1251,7 +1275,7 @@
 		field->map = map;
 		field->value = t_strdup_printf("%lld", diff);
 
-		if (sql_dict_update_query(&build, &query, &error) < 0) {
+		if (sql_dict_update_query(ctx, &build, &query, &error) < 0) {
 			i_error("dict-sql: Failed to increase %s: %s", key, error);
 			ctx->failed = TRUE;
 		} else {