Skip to content

Instantly share code, notes, and snippets.

@PeterCorless
Created March 7, 2019 17:11
Show Gist options
  • Save PeterCorless/5447b936af16e086f02d4b147a8df15f to your computer and use it in GitHub Desktop.
Save PeterCorless/5447b936af16e086f02d4b147a8df15f to your computer and use it in GitHub Desktop.
Scylla and Elasticsearch
cqlsh> desc SCHEMA
CREATE KEYSPACE catalog WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'} AND durable_writes = true;
CREATE TABLE catalog.apparel (
sku text,
color text,
size text,
brand text,
gender text,
group text,
sub_group text,
PRIMARY KEY (sku, color, size)
) WITH CLUSTERING ORDER BY (color ASC, size ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'}
AND comment = ''
AND compaction = {'class': 'SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
$ curl http://127.0.0.1:9200/catalog/?pretty
{
"catalog" : {
"aliases" : { },
"mappings" : {
"apparel" : {
"properties" : {
"brand" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"color" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"gender" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"group" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"size" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"sku" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"sub_group" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
},
"settings" : {
"index" : {
"creation_date" : "1524638529727",
"number_of_shards" : "5",
"number_of_replicas" : "1",
"uuid" : "FLpCxzUlSDWIkfXvCGcfUQ",
"version" : {
"created" : "6020399"
},
"provided_name" : "catalog"
}
}
}
}
#! /usr/bin/env python
# -*- coding: latin-1 -*-
#
### Using elasticsearch-py ###
import csv
from cassandra.cluster import Cluster
from elasticsearch import Elasticsearch
import random
import argparse
import concurrent.futures
from cassandra import ConsistencyLevel
from cassandra.concurrent import execute_concurrent_with_args
## Script args and Help
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument('-s', action="store", dest="SCYLLA_IP", default="127.0.0.1")
parser.add_argument('-e', action="store", dest="ES_IP", default="127.0.0.1")
opts = parser.parse_args()
SCYLLA_IP = opts.SCYLLA_IP.split(',')
ES_IP = opts.ES_IP.split(',')
## Define KS + Table
create_ks = "CREATE KEYSPACE IF NOT EXISTS catalog WITH replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};"
create_t1 = "CREATE TABLE IF NOT EXISTS catalog.apparel (sku text, brand text, group text, sub_group text, color text, size text, gender text, PRIMARY KEY ((sku),color,size));"
## Loading the data
def load_data(filename):
data = []
headers = []
with open(filename, "r") as f:
reader = csv.reader(f)
headers = next(reader) # read the headers line
for l in reader:
doc = {}
for i in range(0, len(l)):
doc[headers[i].lower()] = l[i]
data.append(doc)
return headers, data
## Insert the data
def insert_data(headers, data):
## Connect to Scylla cluster and create schema
# session = cassandra.cluster.Cluster(SCYLLA_IP).connect()
print("")
print("## Connecting to Scylla cluster -> Creating schema")
session = Cluster(SCYLLA_IP).connect()
session.execute(create_ks)
session.execute(create_t1)
## Connect to Elasticsearch
print ("")
print ("## Connecting to Elasticsearch -> Creating 'Catalog' index")
es = Elasticsearch(ES_IP)
## Create Elasticsearch index. Ignore 400 = IF NOT EXIST
es.indices.create(index="catalog", ignore=400)
## Non-prepared CQL statement
#cql = "INSERT INTO catalog.apparel(sku,brand,group,sub_group,color,size,gender) VALUES(%(sku)s,%(brand)s,%(group)s,%(sub_group)s,%(color)s,%(size)s,%(gender)s)"
## Prepared CQL statement
print("")
print("## Preparing CQL statement")
cql = "INSERT INTO catalog.apparel (sku,brand,group,sub_group,color,size,gender) VALUES (?,?,?,?,?,?,?) using TIMESTAMP ?"
cql_prepared = session.prepare(cql)
cql_prepared.consistency_level = ConsistencyLevel.ONE if random.random() < 0.2 else ConsistencyLevel.QUORUM
print("")
print("## Insert csv content into Scylla and Elasticsearch")
for d in data:
# See if we need to add code to wait for the ack. This should be synchronous.
# Also, might need to switch to prepared statements to set the consistency level for sync requests.
session.execute(cql_prepared, d)
res = es.index(index="catalog", doc_type="apparel", id=d["sku"], body=d)
## After all the inserts, make a refresh, just in case
print("")
print("## Inserts completed, refreshing index")
es.indices.refresh(index="catalog")
print("")
if __name__ == "__main__":
headers, data = load_data("./catalog.csv")
insert_data(headers, data)
#! /usr/bin/env python
# -*- coding: latin-1 -*-
#
### Using elasticsearch-py ###
from cassandra.cluster import Cluster
from elasticsearch import Elasticsearch
import random
import argparse
import concurrent.futures
from cassandra import ConsistencyLevel
from cassandra.concurrent import execute_concurrent_with_args
## Script args and help
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument('-s', action="store", dest="SCYLLA_IP", default="127.0.0.1")
parser.add_argument('-e', action="store", dest="ES_IP", default="127.0.0.1")
parser.add_argument('-n', action="store", dest="NUM_FILTERS", default="multiple")
opts = parser.parse_args()
SCYLLA_IP = opts.SCYLLA_IP.split(',')
ES_IP = opts.ES_IP.split(',')
def query_es(NUM_FILTERS):
# Connect to Elasticsearch
print("")
print("## Connecting to Elasticsearch")
es = Elasticsearch(ES_IP)
if NUM_FILTERS == 'single':
# Search using single field filter (group: 'pants')
print("")
print("## Searching for 'pants' in Elasticsearch (filter by group)")
res = es.search(index="catalog", doc_type="apparel", body={"query": {"match": {"group": "pants"}}, "size": 1000})
if NUM_FILTERS == 'multiple':
# Search using multiple fields filter (color: 'white' AND sub_group: 'softshell')
print("")
print("## Searching for 'white softshell' in Elasticsearch (filter by color + sub_group)")
res = es.search(index="catalog", doc_type="apparel", body={"query": {"bool": {"must": [{"match": {"color": "white"}}, {"match": {"sub_group": "softshell"}}]}}, "size": 1000})
if NUM_FILTERS == 'none':
# Search with NO filters (match_all)
print("")
print("## Searching with NO filter = 'match_all' in Elasticsearch")
res = es.search(index="catalog", doc_type="apparel", body={"query": {"match_all": {}}, "size": "1000"})
print("")
print("## %d documents returned" % res['hits']['total'])
es_results = [doc['_id'] for doc in res['hits']['hits']]
# Connect to Scylla
print("")
print("## Connecting to Scylla")
session = Cluster(SCYLLA_IP).connect()
# Prepared CQL statement
print("")
print("## Preparing CQL statement")
cql = "SELECT * FROM catalog.apparel WHERE sku=?"
cql_prepared = session.prepare(cql)
cql_prepared.consistency_level = ConsistencyLevel.ONE if random.random() < 0.2 else ConsistencyLevel.QUORUM
# Query Scylla
print("")
print("## Query Scylla using SKU/s returned from Elasticsearch")
print("")
print("## Final results from Scylla:")
print("")
for r in es_results:
scylla_res = session.execute(cql_prepared, (r,))
print("%s" % ([list(row) for row in scylla_res]))
#for doc in res['hits']['hits']:
# Print all columns in Elasticsearch result set
#print("SKU: %s | Color: %s | Size: %s | Brand: %s | Gender: %s | Group: %s | Sub_Group: %s" % (doc['_id'], doc['_source']['color'], doc['_source']['size'], doc['_source']['brand'], doc['_source']['gender'], doc['_source']['group'], doc['_source']['sub_group']))
# Print only the id (sku) in the result set
#print("SKU: %s" % (doc['_id']))
print("")
if __name__ == '__main__':
query_es(opts.NUM_FILTERS)
#!/usr/bin/env python
# -*- coding: latin-1 -*-
#
### Using elasticsearch-py ###
import os
import re
import csv
from datetime import datetime
from cassandra.cluster import Cluster
from elasticsearch import Elasticsearch
import random
import argparse
import concurrent.futures
from cassandra import ConsistencyLevel
from cassandra.concurrent import execute_concurrent_with_args
## Script args and Help
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument('-e', action="store", dest="ES_IP", default="127.0.0.1")
parser.add_argument('-c', action="store", dest="csv_file_name", default=None)
parser.add_argument('-s', action="store", dest="cql_schema_file_name", default=None)
parser.add_argument('-i', action="store", dest="ignore_cql_schema", default=True)
opts = parser.parse_args()
ES_IP = opts.ES_IP.split(',')
def get_primary_key_fields(schema_filename, csv_filename, ignore_cql_schema):
if schema_filename is None and csv_filename is None:
print("")
raise ValueError("Both schema file (.cql) and data file (.csv) are missing - Exit script")
# If we didn't get a schema file but did get a csv file, or ignoring cql schema (default),
# then select the first column from the csv as the key to be used as the ES id field
if schema_filename is None or ignore_cql_schema is True:
print("")
print("## No schema provided / Ignoring schema -> using column1 from csv as 'id' for Elasticsearch index")
with open(csv_filename, "r") as f:
reader = csv.reader(f)
headers_row = next(reader)
return [headers_row[0]]
else:
with open(schema_filename, "r") as f:
schema_file = f.read()
# Check for compound PK i.e. PRIMARY KEY ((col1,col2),col3)
print("")
print("## Check schema ({0}) for compound primary key to be used as index id".format(schema_filename))
m = re.search(r"PRIMARY KEY \(\((.+?)\)", schema_file, re.I)
if m:
keys = m.group(1).split(",")
return [k.strip() for k in keys]
# We didn't find a compound PK, try checking for a regular PK i.e. PRIMARY KEY (col1,col2,col3)
print("")
print("## Did not find a compound primary key, checking for regular primary key to be used as index id")
m = re.search(r"PRIMARY KEY \((.+)\)", schema_file, re.I)
if m:
keys = m.group(1).split(",")
return [keys[0]]
return []
def get_index_name(filename):
return os.path.splitext(os.path.basename(filename))[0]
def get_headers(reader):
headers = next(reader)
headers_index_mapping = {}
for i in range(0, len(headers)):
headers_index_mapping[headers[i]] = i
return headers, headers_index_mapping
def get_es_id(row, headers, headers_index_mapping, keys):
es_id = []
for k in keys:
i = headers_index_mapping[k]
es_id.append(row[i])
return "-".join(es_id)
def get_row_data(row, headers):
data = {}
for i in range(0, len(row)):
val = None
try:
val = int(row[i])
except ValueError:
val = row[i]
data[headers[i]] = row[i]
return data
## Insert the data
def insert_data(csv_filename, keys):
## Connect to ES
index_name = get_index_name(csv_filename)
# Connecting to ES -> Creating index, if not exist
print("")
print("## Connecting to ES -> Creating '{0}' index, if not exist".format(index_name))
es = Elasticsearch(ES_IP)
# Create ES index. Ignore 400 = IF NOT EXIST
es.indices.create(index=index_name, ignore=400)
print("")
print("## Write csv file ({0}) content into Elasticsearch".format(csv_filename))
print("")
print("## Update every 1000 rows processed ##")
rows_counter = 0
with open(csv_filename, "r") as f:
reader = csv.reader(f, skipinitialspace=True, quoting=csv.QUOTE_ALL, escapechar='\\')
headers, headers_index_mapping = get_headers(reader)
doc_type = "by_{0}".format("-".join(keys))
for row in reader:
es_id = get_es_id(row, headers, headers_index_mapping, keys)
doc_data = get_row_data(row, headers)
res = es.index(index=index_name, doc_type=doc_type, id=es_id, body=doc_data)
rows_counter += 1
# Print update every 1000 rows
if rows_counter % 1000 == 0:
print("Rows processed: {0} ".format(rows_counter))
# After all the inserts, make a refresh, just in case
print("")
print("## After all inserts, refresh index (just in case)")
es.indices.refresh(index=index_name)
# Total processed rows
print("")
print("")
print("### Total Rows Processed: {0} ###".format(rows_counter))
print("")
print("")
if __name__ == "__main__":
keys = get_primary_key_fields(opts.cql_schema_file_name, opts.csv_file_name, opts.ignore_cql_schema)
insert_data(opts.csv_file_name, keys)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment