Skip to content

Instantly share code, notes, and snippets.

@mpenick
Last active September 12, 2019 13:19
Show Gist options
  • Save mpenick/2dd99c686361bbbc80f064286b8ec04c to your computer and use it in GitHub Desktop.
Save mpenick/2dd99c686361bbbc80f064286b8ec04c to your computer and use it in GitHub Desktop.
/*
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <http://unlicense.org/>
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>
#include "cassandra.h"
#define USE_EXEC_PROFILE
void print_error(CassFuture* future) {
const char* message;
size_t message_length;
cass_future_error_message(future, &message, &message_length);
fprintf(stderr, "Error: %.*s\n", (int)message_length, message);
}
CassCluster* create_cluster(const char* hosts) {
CassCluster* cluster = cass_cluster_new();
cass_cluster_set_contact_points(cluster, hosts);
return cluster;
}
CassError connect_session(CassSession* session, const CassCluster* cluster) {
/* Provide the cluster object as configuration to connect the session */
CassError rc = CASS_OK;
CassFuture* future = cass_session_connect(session, cluster);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
/* Handle error */
print_error(future);
}
cass_future_free(future);
return rc;
}
CassError execute_query(CassSession* session, const char* query) {
/* Build statement and execute query */
CassError rc = CASS_OK;
CassFuture* future = NULL;
CassStatement* statement = cass_statement_new(query, 0);
future = cass_session_execute(session, statement);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
/* Handle error */
print_error(future);
}
cass_future_free(future);
cass_statement_free(statement);
return rc;
}
const CassPrepared* handle_prepared(CassFuture* future) {
if (!future) {
return NULL;
}
CassError rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
return NULL;
}
const CassPrepared* prepared = cass_future_get_prepared(future);
cass_future_free(future);
return prepared;
}
const CassResult* handle_result(CassFuture* future) {
if (!future) {
return NULL;
}
CassError rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
return NULL;
}
const CassResult* result = cass_future_get_result(future);
cass_future_free(future);
return result;
}
void add_local_dc_profile(CassCluster* cluster, const char* profile_name, const char* datacenter) {
CassExecProfile* profile = cass_execution_profile_new();
cass_execution_profile_set_token_aware_routing(profile, cass_true);
cass_execution_profile_set_consistency(profile, CASS_CONSISTENCY_LOCAL_QUORUM);
cass_execution_profile_set_load_balance_dc_aware(profile, datacenter, 0, cass_false);
cass_cluster_set_execution_profile(cluster, profile_name, profile);
cass_execution_profile_free(profile);
}
typedef struct {
CassSession* session;
CassStatement* statement;
CassFuture* retry_future;
} RequestState;
void on_result(CassFuture* future, void* data) {
RequestState* state = (RequestState*)data;
CassError rc = cass_future_error_code(future);
if (rc == CASS_ERROR_LIB_NO_HOSTS_AVAILABLE) { /* Retry the query if there are no more local hosts */
cass_statement_set_execution_profile(state->statement, "remote");
state->retry_future = cass_session_execute(state->session, state->statement);
}
}
const CassResult* execute_with_dc_failover(CassSession* session, CassStatement* statement) {
cass_statement_set_execution_profile(statement, "local");
CassFuture* future = cass_session_execute(session, statement);
RequestState request_state;
request_state.session = session;
request_state.statement = statement;
request_state.retry_future = NULL;
cass_future_set_callback(future, on_result, &request_state);
cass_future_wait(future);
/* If we had a "No Hosts Available" error on the first request then wait for the
* request on the other DC.
*/
if (request_state.retry_future) {
cass_future_free(future);
future = request_state.retry_future;
}
return handle_result(future);
}
typedef struct {
CassSession* session;
const CassPrepared* select_prepared;
} SessionState;
void on_request_thread(void* args) {
SessionState* session_state = (SessionState*)args;
while (1) {
CassStatement* statement = cass_prepared_bind(session_state->select_prepared);
char buf[64];
sprintf(buf, "%d", rand() % 100);
cass_statement_bind_string_by_name(statement, "key", buf);
#ifdef USE_EXEC_PROFILE
const CassResult* result = execute_with_dc_failover(session_state->session, statement);
#else
const CassResult* result = handle_result(cass_session_execute(session_state->session, statement));
#endif
if (result) cass_result_free(result);
cass_statement_free(statement);
}
}
void run_timer(SessionState* session_state, uv_timer_cb cb, uint64_t timeout_ms) {
uv_loop_t loop;
uv_loop_init(&loop);
uv_timer_t timer;
timer.data = session_state;
uv_timer_init(&loop, &timer);
uv_timer_start(&timer, cb, timeout_ms, timeout_ms);
uv_run(&loop, UV_RUN_DEFAULT);
uv_loop_close(&loop);
}
void on_metrics_timer(uv_timer_t* timer) {
SessionState* session_state = (SessionState*)timer->data;
CassMetrics metrics;
cass_session_get_metrics(session_state->session, &metrics);
printf("rate stats (requests/second): mean %f 1m %f 5m %f 10m %f\n", metrics.requests.mean_rate,
metrics.requests.one_minute_rate, metrics.requests.five_minute_rate,
metrics.requests.fifteen_minute_rate);
}
int main(int argc, char* argv[]) {
CassCluster* cluster = NULL;
CassSession* session = cass_session_new();
const char* hosts = "127.0.0.1,127.0.0.2,127.0.0.3";
if (argc > 1) {
hosts = argv[1];
}
cluster = create_cluster(hosts);
cass_log_set_level(CASS_LOG_ERROR);
#ifdef USE_EXEC_PROFILE
/* Add two different exceution profiles one for each datacenter */
printf("Using execution profiles for DC failover:\n");
add_local_dc_profile(cluster, "local", "dc1");
add_local_dc_profile(cluster, "remote", "dc2");
#else
/* Use DC-aware allowing remote DCs */
printf("Using DC-aware for DC failover:\n");
cass_cluster_set_load_balance_dc_aware(cluster, "dc1", 3, cass_true);
#endif
cass_cluster_set_consistency(cluster, CASS_CONSISTENCY_LOCAL_QUORUM);
if (connect_session(session, cluster) != CASS_OK) {
cass_cluster_free(cluster);
cass_session_free(session);
return -1;
}
/* Create schema */
execute_query(session, "CREATE KEYSPACE IF NOT EXISTS failover WITH REPLICATION = "
"{ 'class' : 'NetworkTopologyStrategy', 'dc1' : 3, 'dc2' : 3 }");
execute_query(session, "CREATE TABLE IF NOT EXISTS failover.test (key text PRIMARY KEY, value text)");
/* Create prepred statements */
const CassPrepared* insert_prepared = handle_prepared(cass_session_prepare(session, "INSERT INTO failover.test (key, value) VALUES (?, ?)"));
const CassPrepared* select_prepared = handle_prepared(cass_session_prepare(session, "SELECT * FROM failover.test WHERE key = ?"));
if (!insert_prepared || !select_prepared) {
return -1;
}
/* Prime the table with data */
for (int i = 0; i < 100; ++i) {
CassStatement* statement = cass_prepared_bind(insert_prepared);
char buf[64];
sprintf(buf, "%d", i);
cass_statement_bind_string_by_name(statement, "key", buf);
cass_statement_bind_string_by_name(statement, "value", buf);
const CassResult* result = handle_result(cass_session_execute(session, statement));
if (result) cass_result_free(result);
cass_statement_free(statement);
}
SessionState session_state;
session_state.session = session;
session_state.select_prepared = select_prepared;
/* Run requests on another thread */
uv_thread_t request_thread;
uv_thread_create(&request_thread, on_request_thread, &session_state);
/* Run metrics output on this thread */
run_timer(&session_state, on_metrics_timer, 2000);
cass_cluster_free(cluster);
cass_session_free(session);
cass_prepared_free(insert_prepared);
cass_prepared_free(select_prepared);
return 0;
}

Handling DC failover using execution profiles

Add execution profiles for the different DCs in the cluster:

void add_local_dc_profile(CassCluster* cluster, const char* profile_name, const char* datacenter) {
  CassExecProfile* profile = cass_execution_profile_new();
  cass_execution_profile_set_token_aware_routing(profile, cass_true);
  cass_execution_profile_set_consistency(profile, CASS_CONSISTENCY_LOCAL_QUORUM);
  cass_execution_profile_set_load_balance_dc_aware(profile, datacenter, 0, cass_false);
  cass_cluster_set_execution_profile(cluster, profile_name, profile);
  cass_execution_profile_free(profile);
}

/* Add two different exceution profiles one for each datacenter */
add_local_dc_profile(cluster, "local", "dc1");
add_local_dc_profile(cluster, "remote", "dc2");

Execute the statement with a future callback that re-executes the request if CASS_ERROR_LIB_NO_HOSTS_AVAILABLE is returned by the first request the the local DC:

typedef struct {
  CassSession* session;
  CassStatement* statement;
  CassFuture* retry_future;
} RequestState;

const CassResult* handle_result(CassFuture* future) {
  if (!future) {
    return NULL;
  }
  CassError rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    /* Handle error */
    return NULL;
  }
  const CassResult* result = cass_future_get_result(future);
  cass_future_free(future);
  return result;
}


void on_result(CassFuture* future, void* data) {
  RequestState* state = (RequestState*)data;

  CassError rc = cass_future_error_code(future);
  if (rc == CASS_ERROR_LIB_NO_HOSTS_AVAILABLE) { /* Retry the query if there are no more local hosts */
    cass_statement_set_execution_profile(state->statement, "remote");
    state->retry_future = cass_session_execute(state->session, state->statement);
  }
}

const CassResult* execute_with_dc_failover(CassSession* session, CassStatement* statement) {
  cass_statement_set_execution_profile(statement, "local");

  CassFuture* future = cass_session_execute(session, statement);

  RequestState request_state;
  request_state.session = session;
  request_state.statement = statement;
  request_state.retry_future = NULL;

  cass_future_set_callback(future, on_result, &request_state);

  cass_future_wait(future);

  /* If we had a "No Hosts Available" error on the first request then wait for the
   * request on the other DC.
   */
  if (request_state.retry_future) {
    cass_future_free(future);
    future = request_state.retry_future;
  }

  return handle_result(future);
}

/* Execute a statement with a failover to the remote DC if no local DC hosts are available */
const CassResult* result = execute_with_dc_failover(session, statement);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment