# HG changeset patch # User Aki Tuomi # Date 1485884610 -7200 # Node ID a326cf8a579a1e55bdc6f345042c00a65c62b376 # Parent c337e4fe88e5c3a642ae04a0133bdfd0b45e983f driver-cassandra: Add support for speculative execution diff -r c337e4fe88e5 -r a326cf8a579a configure.ac --- a/configure.ac Wed Feb 08 12:03:30 2017 +0200 +++ b/configure.ac Tue Jan 31 19:43:30 2017 +0200 @@ -2382,6 +2382,9 @@ AC_DEFINE(HAVE_CASSANDRA,, [Build with Cassandra support]) found_sql_drivers="$found_sql_drivers cassandra" + AC_CHECK_LIB(cassandra, cass_cluster_set_constant_speculative_execution_policy, [ + AC_DEFINE(HAVE_CASSANDRA_SPECULATIVE_POLICY, 1, [Cassandra supports speculative execution policy]) + ]) ], [ if test $want_cassandra = yes; then AC_ERROR([Can't build with Cassandra support: cassandra.h not found]) diff -r c337e4fe88e5 -r a326cf8a579a src/lib-sql/driver-cassandra.c --- a/src/lib-sql/driver-cassandra.c Wed Feb 08 12:03:30 2017 +0200 +++ b/src/lib-sql/driver-cassandra.c Tue Jan 31 19:43:30 2017 +0200 @@ -85,6 +85,7 @@ unsigned int connect_timeout_secs, request_timeout_secs; unsigned int warn_timeout_secs; unsigned int heartbeat_interval_secs, idle_timeout_secs; + unsigned int execution_retry_interval_msecs, execution_retry_times; in_port_t port; CassCluster *cluster; @@ -494,6 +495,18 @@ } else if (strcmp(key, "metrics") == 0) { i_free(db->metrics_path); db->metrics_path = i_strdup(value); + } else if (strcmp(key, "execution_retry_interval") == 0) { + if (settings_get_time_msecs(value, &db->execution_retry_interval_msecs, &error) < 0) + i_fatal("cassandra: Invalid execution_retry_interval '%s': %s", value, error); +#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY + i_fatal("cassandra: This cassandra version does not support execution_retry_interval"); +#endif + } else if (strcmp(key, "execution_retry_times") == 0) { + if (str_to_uint(value, &db->execution_retry_times) < 0) + i_fatal("cassandra: Invalid execution_retry_times %s", value); +#ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY + i_fatal("cassandra: This cassandra version does not support execution_retry_times"); +#endif } else { i_fatal("cassandra: Unknown connect string: %s", key); } @@ -618,6 +631,10 @@ cass_cluster_set_connection_heartbeat_interval(db->cluster, db->heartbeat_interval_secs); if (db->idle_timeout_secs != 0) cass_cluster_set_connection_idle_timeout(db->cluster, db->idle_timeout_secs); +#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY + if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0) + cass_cluster_set_constant_speculative_execution_policy(db->cluster, db->execution_retry_interval_msecs, db->execution_retry_times); +#endif db->session = cass_session_new(); if (db->metrics_path != NULL) db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db); @@ -848,6 +865,9 @@ result->statement = cass_statement_new(result->query, 0); cass_statement_set_consistency(result->statement, result->consistency); +#ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY + cass_statement_set_is_idempotent(result->statement, cass_true); +#endif future = cass_session_execute(db->session, result->statement); driver_cassandra_set_callback(future, db, query_callback, result); }