Last active
November 14, 2017 12:13
-
-
Save beltran/3feac7b3c08929653b93b2d4ff793996 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from cassandra.cluster import Cluster | |
from cassandra.concurrent import execute_concurrent, query_by_keys | |
from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy | |
from itertools import repeat, cycle | |
from collections import defaultdict | |
import time | |
import six | |
from bokeh.plotting import figure, output_file, show | |
cluster = Cluster(load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy(), shuffle_replicas=False)) | |
session = cluster.connect() | |
KEYSPACE = "query_per_range" | |
TIMES = 1 | |
ROW_SIZE_IN_KB = 10 | |
NUMBER_OF_BYTES = int(ROW_SIZE_IN_KB * 1024) | |
def create_schema(): | |
session.execute(""" | |
CREATE KEYSPACE IF NOT EXISTS %s | |
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' } | |
""" % KEYSPACE) | |
session.execute(""" | |
CREATE TABLE IF NOT EXISTS query_per_range.one_single_key ( | |
key_one text, | |
col1 text, | |
col2 text, | |
col3 text, | |
col4 text, | |
col5 blob, | |
PRIMARY KEY (key_one, col1) | |
) | |
""") | |
def load_some_data(): | |
statements_and_params = zip(cycle(["INSERT INTO query_per_range.one_single_key(key_one, col1, col2, col3, col4, col5) VALUES (%s, %s, %s, %s, %s, %s)"]), | |
[(str(i), str(i), str(i), str(i), str(i), b'x'* NUMBER_OF_BYTES) for i in range(20000)]) | |
execute_concurrent(session, list(statements_and_params)) | |
def random_query_by_keys(session, keyspace, table, select_fields, keys): | |
select_query = "SELECT " + ",".join(select_fields) + " FROM {}.{} WHERE ".format(keyspace, table) | |
cluster = session.cluster | |
partition_keys = cluster.metadata.keyspaces[keyspace].tables[table].partition_key | |
partition_key_name = partition_keys[0].name | |
no_valid_replica = object() | |
keys_per_host = defaultdict(list) | |
keys_per_int = defaultdict(list) | |
all_hosts = cluster.metadata.all_hosts() | |
for i, key in enumerate(keys): | |
keys_per_int[i % len(all_hosts)].append(key) | |
for i, host in enumerate(all_hosts): | |
keys_per_host[host] = keys_per_int[i] | |
response_futures = [] | |
for host, keys_in_host in six.iteritems(keys_per_host): | |
primary_keys_query = partition_key_name + " IN " | |
params_query = "(" + ",".join(["%s"] * len(keys_in_host)) + ")" | |
statement = select_query + primary_keys_query + params_query | |
response_future = session._create_response_future(statement, keys_in_host, trace=False, | |
custom_payload=None, timeout=session.default_timeout) | |
if host is no_valid_replica: | |
response_future.send_request() | |
else: | |
response_future._query(host) | |
response_futures.append(response_future) | |
for response_future in response_futures: | |
results = response_future.result() | |
for row in results: | |
yield row | |
x_axis = (10, 100, 1000, 2000, 4000, 8000, 16000, 20000) | |
def generate_results_query_by_keys(TIMES=TIMES): | |
y_axis = [] | |
for num_rows in x_axis: | |
keys = list(str(i) for i in range(num_rows)) | |
this_set = set() | |
before = time.time() | |
for _ in range(TIMES): | |
results = query_by_keys(session, | |
"query_per_range", "one_single_key", ["key_one", "col1", "col2"], keys) | |
for row in results: | |
this_set.add(row[0]) | |
after = time.time() | |
print("Time elpased: {}".format(after - before)) | |
print("len(this_set): {}".format(len(this_set))) | |
y_axis.append((after - before) / TIMES) | |
assert len(this_set) == num_rows | |
return x_axis, y_axis, "replica_in_clauses" | |
def generate_results_random_in_clauses(TIMES=TIMES): | |
y_axis = [] | |
for num_rows in x_axis: | |
keys = list(str(i) for i in range(num_rows)) | |
this_set = set() | |
before = time.time() | |
for _ in range(TIMES): | |
results = random_query_by_keys(session, | |
"query_per_range", "one_single_key", ["key_one", "col1", "col2"], keys) | |
for row in results: | |
this_set.add(row[0]) | |
after = time.time() | |
print("Time elpased: {}".format(after - before)) | |
print("len(this_set): {}".format(len(this_set))) | |
y_axis.append((after - before) / TIMES) | |
assert len(this_set) == num_rows | |
return x_axis, y_axis, "random_in_clauses" | |
def generate_results_execute_concurrent(TIMES=TIMES): | |
prepared = session.prepare("""SELECT key_one,col1,col2 FROM query_per_range.one_single_key WHERE key_one = ?""") | |
y_axis = [] | |
for num_rows in x_axis: | |
keys = list((str(i),) for i in range(num_rows)) | |
new_set = set() | |
before = time.time() | |
for _ in range(TIMES): | |
statements_and_params = zip(repeat(prepared), keys) | |
results = execute_concurrent(session, statements_and_params) | |
for execution_result in results: | |
if execution_result.success: | |
for row in execution_result.result_or_exc: | |
new_set.add(row[0]) | |
after = time.time() | |
print("Time elpased: {}".format(after - before)) | |
print("len(new_set): {}".format(len(new_set))) | |
y_axis.append((after - before) / TIMES) | |
assert len(new_set) == num_rows | |
return x_axis, y_axis, "execute_concurrent" | |
create_schema() | |
load_some_data() | |
output_file("comparison.html") | |
p = figure(title="execut_concurrent vs IN clause", x_axis_label='Number of keys', y_axis_label='Seconds') | |
p.legend.location = "top_left" | |
x, y, tag = generate_results_query_by_keys() | |
p.circle(x, y, legend=tag, fill_color="red", line_color="red", size=6) | |
p.line(x, y, legend=tag, line_color="red") | |
x, y, tag = generate_results_execute_concurrent() | |
#x, y, tag = generate_results_random_in_clauses() | |
p.circle(x, y, legend=tag, fill_color="blue", line_color="blue") | |
p.line(x, y, legend=tag, line_color="blue") | |
show(p) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment