Created
March 7, 2019 17:11
-
-
Save PeterCorless/5447b936af16e086f02d4b147a8df15f to your computer and use it in GitHub Desktop.
Scylla and Elasticsearch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cqlsh> desc SCHEMA | |
CREATE KEYSPACE catalog WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'} AND durable_writes = true; | |
CREATE TABLE catalog.apparel ( | |
sku text, | |
color text, | |
size text, | |
brand text, | |
gender text, | |
group text, | |
sub_group text, | |
PRIMARY KEY (sku, color, size) | |
) WITH CLUSTERING ORDER BY (color ASC, size ASC) | |
AND bloom_filter_fp_chance = 0.01 | |
AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'} | |
AND comment = '' | |
AND compaction = {'class': 'SizeTieredCompactionStrategy'} | |
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} | |
AND crc_check_chance = 1.0 | |
AND dclocal_read_repair_chance = 0.1 | |
AND default_time_to_live = 0 | |
AND gc_grace_seconds = 864000 | |
AND max_index_interval = 2048 | |
AND memtable_flush_period_in_ms = 0 | |
AND min_index_interval = 128 | |
AND read_repair_chance = 0.0 | |
AND speculative_retry = '99.0PERCENTILE'; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ curl http://127.0.0.1:9200/catalog/?pretty | |
{ | |
"catalog" : { | |
"aliases" : { }, | |
"mappings" : { | |
"apparel" : { | |
"properties" : { | |
"brand" : { | |
"type" : "text", | |
"fields" : { | |
"keyword" : { | |
"type" : "keyword", | |
"ignore_above" : 256 | |
} | |
} | |
}, | |
"color" : { | |
"type" : "text", | |
"fields" : { | |
"keyword" : { | |
"type" : "keyword", | |
"ignore_above" : 256 | |
} | |
} | |
}, | |
"gender" : { | |
"type" : "text", | |
"fields" : { | |
"keyword" : { | |
"type" : "keyword", | |
"ignore_above" : 256 | |
} | |
} | |
}, | |
"group" : { | |
"type" : "text", | |
"fields" : { | |
"keyword" : { | |
"type" : "keyword", | |
"ignore_above" : 256 | |
} | |
} | |
}, | |
"size" : { | |
"type" : "text", | |
"fields" : { | |
"keyword" : { | |
"type" : "keyword", | |
"ignore_above" : 256 | |
} | |
} | |
}, | |
"sku" : { | |
"type" : "text", | |
"fields" : { | |
"keyword" : { | |
"type" : "keyword", | |
"ignore_above" : 256 | |
} | |
} | |
}, | |
"sub_group" : { | |
"type" : "text", | |
"fields" : { | |
"keyword" : { | |
"type" : "keyword", | |
"ignore_above" : 256 | |
} | |
} | |
} | |
} | |
} | |
}, | |
"settings" : { | |
"index" : { | |
"creation_date" : "1524638529727", | |
"number_of_shards" : "5", | |
"number_of_replicas" : "1", | |
"uuid" : "FLpCxzUlSDWIkfXvCGcfUQ", | |
"version" : { | |
"created" : "6020399" | |
}, | |
"provided_name" : "catalog" | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# -*- coding: latin-1 -*- | |
# | |
### Using elasticsearch-py ### | |
import csv | |
from cassandra.cluster import Cluster | |
from elasticsearch import Elasticsearch | |
import random | |
import argparse | |
import concurrent.futures | |
from cassandra import ConsistencyLevel | |
from cassandra.concurrent import execute_concurrent_with_args | |
## Script args and Help | |
parser = argparse.ArgumentParser(add_help=True) | |
parser.add_argument('-s', action="store", dest="SCYLLA_IP", default="127.0.0.1") | |
parser.add_argument('-e', action="store", dest="ES_IP", default="127.0.0.1") | |
opts = parser.parse_args() | |
SCYLLA_IP = opts.SCYLLA_IP.split(',') | |
ES_IP = opts.ES_IP.split(',') | |
## Define KS + Table | |
create_ks = "CREATE KEYSPACE IF NOT EXISTS catalog WITH replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};" | |
create_t1 = "CREATE TABLE IF NOT EXISTS catalog.apparel (sku text, brand text, group text, sub_group text, color text, size text, gender text, PRIMARY KEY ((sku),color,size));" | |
## Loading the data | |
def load_data(filename): | |
data = [] | |
headers = [] | |
with open(filename, "r") as f: | |
reader = csv.reader(f) | |
headers = next(reader) # read the headers line | |
for l in reader: | |
doc = {} | |
for i in range(0, len(l)): | |
doc[headers[i].lower()] = l[i] | |
data.append(doc) | |
return headers, data | |
## Insert the data | |
def insert_data(headers, data): | |
## Connect to Scylla cluster and create schema | |
# session = cassandra.cluster.Cluster(SCYLLA_IP).connect() | |
print("") | |
print("## Connecting to Scylla cluster -> Creating schema") | |
session = Cluster(SCYLLA_IP).connect() | |
session.execute(create_ks) | |
session.execute(create_t1) | |
## Connect to Elasticsearch | |
print ("") | |
print ("## Connecting to Elasticsearch -> Creating 'Catalog' index") | |
es = Elasticsearch(ES_IP) | |
## Create Elasticsearch index. Ignore 400 = IF NOT EXIST | |
es.indices.create(index="catalog", ignore=400) | |
## Non-prepared CQL statement | |
#cql = "INSERT INTO catalog.apparel(sku,brand,group,sub_group,color,size,gender) VALUES(%(sku)s,%(brand)s,%(group)s,%(sub_group)s,%(color)s,%(size)s,%(gender)s)" | |
## Prepared CQL statement | |
print("") | |
print("## Preparing CQL statement") | |
cql = "INSERT INTO catalog.apparel (sku,brand,group,sub_group,color,size,gender) VALUES (?,?,?,?,?,?,?) using TIMESTAMP ?" | |
cql_prepared = session.prepare(cql) | |
cql_prepared.consistency_level = ConsistencyLevel.ONE if random.random() < 0.2 else ConsistencyLevel.QUORUM | |
print("") | |
print("## Insert csv content into Scylla and Elasticsearch") | |
for d in data: | |
# See if we need to add code to wait for the ack. This should be synchronous. | |
# Also, might need to switch to prepared statements to set the consistency level for sync requests. | |
session.execute(cql_prepared, d) | |
res = es.index(index="catalog", doc_type="apparel", id=d["sku"], body=d) | |
## After all the inserts, make a refresh, just in case | |
print("") | |
print("## Inserts completed, refreshing index") | |
es.indices.refresh(index="catalog") | |
print("") | |
if __name__ == "__main__": | |
headers, data = load_data("./catalog.csv") | |
insert_data(headers, data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# -*- coding: latin-1 -*- | |
# | |
### Using elasticsearch-py ### | |
from cassandra.cluster import Cluster | |
from elasticsearch import Elasticsearch | |
import random | |
import argparse | |
import concurrent.futures | |
from cassandra import ConsistencyLevel | |
from cassandra.concurrent import execute_concurrent_with_args | |
## Script args and help | |
parser = argparse.ArgumentParser(add_help=True) | |
parser.add_argument('-s', action="store", dest="SCYLLA_IP", default="127.0.0.1") | |
parser.add_argument('-e', action="store", dest="ES_IP", default="127.0.0.1") | |
parser.add_argument('-n', action="store", dest="NUM_FILTERS", default="multiple") | |
opts = parser.parse_args() | |
SCYLLA_IP = opts.SCYLLA_IP.split(',') | |
ES_IP = opts.ES_IP.split(',') | |
def query_es(NUM_FILTERS): | |
# Connect to Elasticsearch | |
print("") | |
print("## Connecting to Elasticsearch") | |
es = Elasticsearch(ES_IP) | |
if NUM_FILTERS == 'single': | |
# Search using single field filter (group: 'pants') | |
print("") | |
print("## Searching for 'pants' in Elasticsearch (filter by group)") | |
res = es.search(index="catalog", doc_type="apparel", body={"query": {"match": {"group": "pants"}}, "size": 1000}) | |
if NUM_FILTERS == 'multiple': | |
# Search using multiple fields filter (color: 'white' AND sub_group: 'softshell') | |
print("") | |
print("## Searching for 'white softshell' in Elasticsearch (filter by color + sub_group)") | |
res = es.search(index="catalog", doc_type="apparel", body={"query": {"bool": {"must": [{"match": {"color": "white"}}, {"match": {"sub_group": "softshell"}}]}}, "size": 1000}) | |
if NUM_FILTERS == 'none': | |
# Search with NO filters (match_all) | |
print("") | |
print("## Searching with NO filter = 'match_all' in Elasticsearch") | |
res = es.search(index="catalog", doc_type="apparel", body={"query": {"match_all": {}}, "size": "1000"}) | |
print("") | |
print("## %d documents returned" % res['hits']['total']) | |
es_results = [doc['_id'] for doc in res['hits']['hits']] | |
# Connect to Scylla | |
print("") | |
print("## Connecting to Scylla") | |
session = Cluster(SCYLLA_IP).connect() | |
# Prepared CQL statement | |
print("") | |
print("## Preparing CQL statement") | |
cql = "SELECT * FROM catalog.apparel WHERE sku=?" | |
cql_prepared = session.prepare(cql) | |
cql_prepared.consistency_level = ConsistencyLevel.ONE if random.random() < 0.2 else ConsistencyLevel.QUORUM | |
# Query Scylla | |
print("") | |
print("## Query Scylla using SKU/s returned from Elasticsearch") | |
print("") | |
print("## Final results from Scylla:") | |
print("") | |
for r in es_results: | |
scylla_res = session.execute(cql_prepared, (r,)) | |
print("%s" % ([list(row) for row in scylla_res])) | |
#for doc in res['hits']['hits']: | |
# Print all columns in Elasticsearch result set | |
#print("SKU: %s | Color: %s | Size: %s | Brand: %s | Gender: %s | Group: %s | Sub_Group: %s" % (doc['_id'], doc['_source']['color'], doc['_source']['size'], doc['_source']['brand'], doc['_source']['gender'], doc['_source']['group'], doc['_source']['sub_group'])) | |
# Print only the id (sku) in the result set | |
#print("SKU: %s" % (doc['_id'])) | |
print("") | |
if __name__ == '__main__': | |
query_es(opts.NUM_FILTERS) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: latin-1 -*- | |
# | |
### Using elasticsearch-py ### | |
import os | |
import re | |
import csv | |
from datetime import datetime | |
from cassandra.cluster import Cluster | |
from elasticsearch import Elasticsearch | |
import random | |
import argparse | |
import concurrent.futures | |
from cassandra import ConsistencyLevel | |
from cassandra.concurrent import execute_concurrent_with_args | |
## Script args and Help | |
parser = argparse.ArgumentParser(add_help=True) | |
parser.add_argument('-e', action="store", dest="ES_IP", default="127.0.0.1") | |
parser.add_argument('-c', action="store", dest="csv_file_name", default=None) | |
parser.add_argument('-s', action="store", dest="cql_schema_file_name", default=None) | |
parser.add_argument('-i', action="store", dest="ignore_cql_schema", default=True) | |
opts = parser.parse_args() | |
ES_IP = opts.ES_IP.split(',') | |
def get_primary_key_fields(schema_filename, csv_filename, ignore_cql_schema): | |
if schema_filename is None and csv_filename is None: | |
print("") | |
raise ValueError("Both schema file (.cql) and data file (.csv) are missing - Exit script") | |
# If we didn't get a schema file but did get a csv file, or ignoring cql schema (default), | |
# then select the first column from the csv as the key to be used as the ES id field | |
if schema_filename is None or ignore_cql_schema is True: | |
print("") | |
print("## No schema provided / Ignoring schema -> using column1 from csv as 'id' for Elasticsearch index") | |
with open(csv_filename, "r") as f: | |
reader = csv.reader(f) | |
headers_row = next(reader) | |
return [headers_row[0]] | |
else: | |
with open(schema_filename, "r") as f: | |
schema_file = f.read() | |
# Check for compound PK i.e. PRIMARY KEY ((col1,col2),col3) | |
print("") | |
print("## Check schema ({0}) for compound primary key to be used as index id".format(schema_filename)) | |
m = re.search(r"PRIMARY KEY \(\((.+?)\)", schema_file, re.I) | |
if m: | |
keys = m.group(1).split(",") | |
return [k.strip() for k in keys] | |
# We didn't find a compound PK, try checking for a regular PK i.e. PRIMARY KEY (col1,col2,col3) | |
print("") | |
print("## Did not find a compound primary key, checking for regular primary key to be used as index id") | |
m = re.search(r"PRIMARY KEY \((.+)\)", schema_file, re.I) | |
if m: | |
keys = m.group(1).split(",") | |
return [keys[0]] | |
return [] | |
def get_index_name(filename): | |
return os.path.splitext(os.path.basename(filename))[0] | |
def get_headers(reader): | |
headers = next(reader) | |
headers_index_mapping = {} | |
for i in range(0, len(headers)): | |
headers_index_mapping[headers[i]] = i | |
return headers, headers_index_mapping | |
def get_es_id(row, headers, headers_index_mapping, keys): | |
es_id = [] | |
for k in keys: | |
i = headers_index_mapping[k] | |
es_id.append(row[i]) | |
return "-".join(es_id) | |
def get_row_data(row, headers): | |
data = {} | |
for i in range(0, len(row)): | |
val = None | |
try: | |
val = int(row[i]) | |
except ValueError: | |
val = row[i] | |
data[headers[i]] = row[i] | |
return data | |
## Insert the data | |
def insert_data(csv_filename, keys): | |
## Connect to ES | |
index_name = get_index_name(csv_filename) | |
# Connecting to ES -> Creating index, if not exist | |
print("") | |
print("## Connecting to ES -> Creating '{0}' index, if not exist".format(index_name)) | |
es = Elasticsearch(ES_IP) | |
# Create ES index. Ignore 400 = IF NOT EXIST | |
es.indices.create(index=index_name, ignore=400) | |
print("") | |
print("## Write csv file ({0}) content into Elasticsearch".format(csv_filename)) | |
print("") | |
print("## Update every 1000 rows processed ##") | |
rows_counter = 0 | |
with open(csv_filename, "r") as f: | |
reader = csv.reader(f, skipinitialspace=True, quoting=csv.QUOTE_ALL, escapechar='\\') | |
headers, headers_index_mapping = get_headers(reader) | |
doc_type = "by_{0}".format("-".join(keys)) | |
for row in reader: | |
es_id = get_es_id(row, headers, headers_index_mapping, keys) | |
doc_data = get_row_data(row, headers) | |
res = es.index(index=index_name, doc_type=doc_type, id=es_id, body=doc_data) | |
rows_counter += 1 | |
# Print update every 1000 rows | |
if rows_counter % 1000 == 0: | |
print("Rows processed: {0} ".format(rows_counter)) | |
# After all the inserts, make a refresh, just in case | |
print("") | |
print("## After all inserts, refresh index (just in case)") | |
es.indices.refresh(index=index_name) | |
# Total processed rows | |
print("") | |
print("") | |
print("### Total Rows Processed: {0} ###".format(rows_counter)) | |
print("") | |
print("") | |
if __name__ == "__main__": | |
keys = get_primary_key_fields(opts.cql_schema_file_name, opts.csv_file_name, opts.ignore_cql_schema) | |
insert_data(opts.csv_file_name, keys) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment