changeset 21531:a326cf8a579a

driver-cassandra: Add support for speculative execution
author Aki Tuomi <aki.tuomi@dovecot.fi>
date Tue, 31 Jan 2017 19:43:30 +0200
parents c337e4fe88e5
children 0754351f9f75
files configure.ac src/lib-sql/driver-cassandra.c
diffstat 2 files changed, 23 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/configure.ac	Wed Feb 08 12:03:30 2017 +0200
+++ b/configure.ac	Tue Jan 31 19:43:30 2017 +0200
@@ -2382,6 +2382,9 @@
 
 			AC_DEFINE(HAVE_CASSANDRA,, [Build with Cassandra support])
 			found_sql_drivers="$found_sql_drivers cassandra"
+                        AC_CHECK_LIB(cassandra, cass_cluster_set_constant_speculative_execution_policy, [
+                          AC_DEFINE(HAVE_CASSANDRA_SPECULATIVE_POLICY, 1, [Cassandra supports speculative execution policy])
+                        ])
 		], [
 		  if test $want_cassandra = yes; then
 		    AC_ERROR([Can't build with Cassandra support: cassandra.h not found])
--- a/src/lib-sql/driver-cassandra.c	Wed Feb 08 12:03:30 2017 +0200
+++ b/src/lib-sql/driver-cassandra.c	Tue Jan 31 19:43:30 2017 +0200
@@ -85,6 +85,7 @@
 	unsigned int connect_timeout_secs, request_timeout_secs;
 	unsigned int warn_timeout_secs;
 	unsigned int heartbeat_interval_secs, idle_timeout_secs;
+	unsigned int execution_retry_interval_msecs, execution_retry_times;
 	in_port_t port;
 
 	CassCluster *cluster;
@@ -494,6 +495,18 @@
 		} else if (strcmp(key, "metrics") == 0) {
 			i_free(db->metrics_path);
 			db->metrics_path = i_strdup(value);
+		} else if (strcmp(key, "execution_retry_interval") == 0) {
+			if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0)
+				i_fatal("cassandra: Invalid execution_retry_interval '%s': %s", value, error);
+#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
+			i_fatal("cassandra: This cassandra version does not support execution_retry_interval");
+#endif
+		} else if (strcmp(key, "execution_retry_times") == 0) {
+			if (str_to_uint(value, &db->execution_retry_times) < 0)
+				i_fatal("cassandra: Invalid execution_retry_times %s", value);
+#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
+			i_fatal("cassandra: This cassandra version does not support execution_retry_times");
+#endif
 		} else {
 			i_fatal("cassandra: Unknown connect string: %s", key);
 		}
@@ -618,6 +631,10 @@
 		cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs);
 	if (db->idle_timeout_secs != 0)
 		cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs);
+#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
+	if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
+		cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times);
+#endif
 	db->session = cass_session_new();
 	if (db->metrics_path != NULL)
 		db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
@@ -848,6 +865,9 @@
 	result->statement = cass_statement_new(result->query, 0);
 	cass_statement_set_consistency(result->statement, result->consistency);
 
+#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
+	cass_statement_set_is_idempotent(result->statement, cass_true);
+#endif
 	future = cass_session_execute(db->session, result->statement);
 	driver_cassandra_set_callback(future, db, query_callback, result);
 }