|
/* |
|
This is free and unencumbered software released into the public domain. |
|
|
|
Anyone is free to copy, modify, publish, use, compile, sell, or |
|
distribute this software, either in source code form or as a compiled |
|
binary, for any purpose, commercial or non-commercial, and by any |
|
means. |
|
|
|
In jurisdictions that recognize copyright laws, the author or authors |
|
of this software dedicate any and all copyright interest in the |
|
software to the public domain. We make this dedication for the benefit |
|
of the public at large and to the detriment of our heirs and |
|
successors. We intend this dedication to be an overt act of |
|
relinquishment in perpetuity of all present and future rights to this |
|
software under copyright law. |
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
|
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
|
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
|
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR |
|
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, |
|
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR |
|
OTHER DEALINGS IN THE SOFTWARE. |
|
|
|
For more information, please refer to <http://unlicense.org/> |
|
*/ |
|
|
|
#include <assert.h> |
|
#include <stdio.h> |
|
#include <stdlib.h> |
|
#include <string.h> |
|
#include <uv.h> |
|
|
|
#include "cassandra.h" |
|
|
|
#define USE_EXEC_PROFILE |
|
|
|
void print_error(CassFuture* future) { |
|
const char* message; |
|
size_t message_length; |
|
cass_future_error_message(future, &message, &message_length); |
|
fprintf(stderr, "Error: %.*s\n", (int)message_length, message); |
|
} |
|
|
|
CassCluster* create_cluster(const char* hosts) { |
|
CassCluster* cluster = cass_cluster_new(); |
|
cass_cluster_set_contact_points(cluster, hosts); |
|
return cluster; |
|
} |
|
|
|
CassError connect_session(CassSession* session, const CassCluster* cluster) { |
|
/* Provide the cluster object as configuration to connect the session */ |
|
CassError rc = CASS_OK; |
|
CassFuture* future = cass_session_connect(session, cluster); |
|
|
|
rc = cass_future_error_code(future); |
|
if (rc != CASS_OK) { |
|
/* Handle error */ |
|
print_error(future); |
|
} |
|
cass_future_free(future); |
|
|
|
return rc; |
|
} |
|
|
|
CassError execute_query(CassSession* session, const char* query) { |
|
/* Build statement and execute query */ |
|
CassError rc = CASS_OK; |
|
CassFuture* future = NULL; |
|
CassStatement* statement = cass_statement_new(query, 0); |
|
future = cass_session_execute(session, statement); |
|
|
|
rc = cass_future_error_code(future); |
|
if (rc != CASS_OK) { |
|
/* Handle error */ |
|
print_error(future); |
|
} |
|
|
|
cass_future_free(future); |
|
cass_statement_free(statement); |
|
|
|
return rc; |
|
} |
|
|
|
const CassPrepared* handle_prepared(CassFuture* future) { |
|
if (!future) { |
|
return NULL; |
|
} |
|
CassError rc = cass_future_error_code(future); |
|
if (rc != CASS_OK) { |
|
print_error(future); |
|
return NULL; |
|
} |
|
const CassPrepared* prepared = cass_future_get_prepared(future); |
|
cass_future_free(future); |
|
return prepared; |
|
} |
|
|
|
const CassResult* handle_result(CassFuture* future) { |
|
if (!future) { |
|
return NULL; |
|
} |
|
CassError rc = cass_future_error_code(future); |
|
if (rc != CASS_OK) { |
|
print_error(future); |
|
return NULL; |
|
} |
|
const CassResult* result = cass_future_get_result(future); |
|
cass_future_free(future); |
|
return result; |
|
} |
|
|
|
void add_local_dc_profile(CassCluster* cluster, const char* profile_name, const char* datacenter) { |
|
CassExecProfile* profile = cass_execution_profile_new(); |
|
cass_execution_profile_set_token_aware_routing(profile, cass_true); |
|
cass_execution_profile_set_consistency(profile, CASS_CONSISTENCY_LOCAL_QUORUM); |
|
cass_execution_profile_set_load_balance_dc_aware(profile, datacenter, 0, cass_false); |
|
cass_cluster_set_execution_profile(cluster, profile_name, profile); |
|
cass_execution_profile_free(profile); |
|
} |
|
|
|
typedef struct { |
|
CassSession* session; |
|
CassStatement* statement; |
|
CassFuture* retry_future; |
|
} RequestState; |
|
|
|
void on_result(CassFuture* future, void* data) { |
|
RequestState* state = (RequestState*)data; |
|
|
|
CassError rc = cass_future_error_code(future); |
|
if (rc == CASS_ERROR_LIB_NO_HOSTS_AVAILABLE) { /* Retry the query if there are no more local hosts */ |
|
cass_statement_set_execution_profile(state->statement, "remote"); |
|
state->retry_future = cass_session_execute(state->session, state->statement); |
|
} |
|
} |
|
|
|
const CassResult* execute_with_dc_failover(CassSession* session, CassStatement* statement) { |
|
cass_statement_set_execution_profile(statement, "local"); |
|
|
|
CassFuture* future = cass_session_execute(session, statement); |
|
|
|
RequestState request_state; |
|
request_state.session = session; |
|
request_state.statement = statement; |
|
request_state.retry_future = NULL; |
|
|
|
cass_future_set_callback(future, on_result, &request_state); |
|
|
|
cass_future_wait(future); |
|
|
|
/* If we had a "No Hosts Available" error on the first request then wait for the |
|
* request on the other DC. |
|
*/ |
|
if (request_state.retry_future) { |
|
cass_future_free(future); |
|
future = request_state.retry_future; |
|
} |
|
|
|
return handle_result(future); |
|
} |
|
|
|
typedef struct { |
|
CassSession* session; |
|
const CassPrepared* select_prepared; |
|
} SessionState; |
|
|
|
void on_request_thread(void* args) { |
|
SessionState* session_state = (SessionState*)args; |
|
|
|
while (1) { |
|
CassStatement* statement = cass_prepared_bind(session_state->select_prepared); |
|
|
|
char buf[64]; |
|
sprintf(buf, "%d", rand() % 100); |
|
cass_statement_bind_string_by_name(statement, "key", buf); |
|
|
|
#ifdef USE_EXEC_PROFILE |
|
const CassResult* result = execute_with_dc_failover(session_state->session, statement); |
|
#else |
|
const CassResult* result = handle_result(cass_session_execute(session_state->session, statement)); |
|
#endif |
|
if (result) cass_result_free(result); |
|
cass_statement_free(statement); |
|
} |
|
} |
|
|
|
void run_timer(SessionState* session_state, uv_timer_cb cb, uint64_t timeout_ms) { |
|
uv_loop_t loop; |
|
uv_loop_init(&loop); |
|
|
|
uv_timer_t timer; |
|
timer.data = session_state; |
|
|
|
uv_timer_init(&loop, &timer); |
|
uv_timer_start(&timer, cb, timeout_ms, timeout_ms); |
|
|
|
uv_run(&loop, UV_RUN_DEFAULT); |
|
|
|
uv_loop_close(&loop); |
|
} |
|
|
|
void on_metrics_timer(uv_timer_t* timer) { |
|
SessionState* session_state = (SessionState*)timer->data; |
|
CassMetrics metrics; |
|
cass_session_get_metrics(session_state->session, &metrics); |
|
printf("rate stats (requests/second): mean %f 1m %f 5m %f 10m %f\n", metrics.requests.mean_rate, |
|
metrics.requests.one_minute_rate, metrics.requests.five_minute_rate, |
|
metrics.requests.fifteen_minute_rate); |
|
} |
|
|
|
int main(int argc, char* argv[]) { |
|
CassCluster* cluster = NULL; |
|
CassSession* session = cass_session_new(); |
|
const char* hosts = "127.0.0.1,127.0.0.2,127.0.0.3"; |
|
|
|
if (argc > 1) { |
|
hosts = argv[1]; |
|
} |
|
cluster = create_cluster(hosts); |
|
|
|
cass_log_set_level(CASS_LOG_ERROR); |
|
|
|
#ifdef USE_EXEC_PROFILE |
|
/* Add two different exceution profiles one for each datacenter */ |
|
printf("Using execution profiles for DC failover:\n"); |
|
add_local_dc_profile(cluster, "local", "dc1"); |
|
add_local_dc_profile(cluster, "remote", "dc2"); |
|
#else |
|
/* Use DC-aware allowing remote DCs */ |
|
printf("Using DC-aware for DC failover:\n"); |
|
cass_cluster_set_load_balance_dc_aware(cluster, "dc1", 3, cass_true); |
|
#endif |
|
|
|
cass_cluster_set_consistency(cluster, CASS_CONSISTENCY_LOCAL_QUORUM); |
|
|
|
if (connect_session(session, cluster) != CASS_OK) { |
|
cass_cluster_free(cluster); |
|
cass_session_free(session); |
|
return -1; |
|
} |
|
|
|
/* Create schema */ |
|
execute_query(session, "CREATE KEYSPACE IF NOT EXISTS failover WITH REPLICATION = " |
|
"{ 'class' : 'NetworkTopologyStrategy', 'dc1' : 3, 'dc2' : 3 }"); |
|
execute_query(session, "CREATE TABLE IF NOT EXISTS failover.test (key text PRIMARY KEY, value text)"); |
|
|
|
/* Create prepred statements */ |
|
const CassPrepared* insert_prepared = handle_prepared(cass_session_prepare(session, "INSERT INTO failover.test (key, value) VALUES (?, ?)")); |
|
const CassPrepared* select_prepared = handle_prepared(cass_session_prepare(session, "SELECT * FROM failover.test WHERE key = ?")); |
|
|
|
if (!insert_prepared || !select_prepared) { |
|
return -1; |
|
} |
|
|
|
/* Prime the table with data */ |
|
for (int i = 0; i < 100; ++i) { |
|
CassStatement* statement = cass_prepared_bind(insert_prepared); |
|
|
|
char buf[64]; |
|
sprintf(buf, "%d", i); |
|
cass_statement_bind_string_by_name(statement, "key", buf); |
|
cass_statement_bind_string_by_name(statement, "value", buf); |
|
|
|
const CassResult* result = handle_result(cass_session_execute(session, statement)); |
|
if (result) cass_result_free(result); |
|
|
|
cass_statement_free(statement); |
|
} |
|
|
|
SessionState session_state; |
|
session_state.session = session; |
|
session_state.select_prepared = select_prepared; |
|
|
|
/* Run requests on another thread */ |
|
uv_thread_t request_thread; |
|
uv_thread_create(&request_thread, on_request_thread, &session_state); |
|
|
|
/* Run metrics output on this thread */ |
|
run_timer(&session_state, on_metrics_timer, 2000); |
|
|
|
cass_cluster_free(cluster); |
|
cass_session_free(session); |
|
cass_prepared_free(insert_prepared); |
|
cass_prepared_free(select_prepared); |
|
|
|
return 0; |
|
} |