Skip to content

Instantly share code, notes, and snippets.

@dogukancagatay
Created May 26, 2020 17:08
Show Gist options
  • Save dogukancagatay/350523976f23efec87aa61fc52d98a39 to your computer and use it in GitHub Desktop.
Save dogukancagatay/350523976f23efec87aa61fc52d98a39 to your computer and use it in GitHub Desktop.
Simple Solr Query Benchmark
import pysolr
import time
import json
import os
import threading
from datetime import datetime
from datetime import timedelta
##############################################################################
################################ CONFIG ######################################
##############################################################################
SOLR_HOST = "http://solr-1:8983"
COLLECTION_PREFIX = "solr_collection"
QUERY_BATCH = 1000
QUERY_FILE = "query.json"
START_TIME = datetime.fromisoformat("2020-05-01 00:00:00.000+00:00")
END_TIME = datetime.fromisoformat("2020-05-01 00:05:00.000+00:00")
ENABLE_BATCH_REPORT = False
NUM_THREADS = 4 # Defines how many queries will run concurrently
##############################################################################
##############################################################################
##############################################################################
# Global vars
NL = "\n"
def readQueryFile(filename="query.json"):
queries = []
if filename.endswith(".jsonl"):
with open(filename, mode="r", encoding="utf-8") as f:
for line in f:
try:
jobj = json.loads(line)
queries.append(jobj)
except:
print(f"Line couldn't be parsed as JSON: {line}")
elif filename.endswith(".json"):
with open(filename, mode="r", encoding="utf-8") as f:
try:
queries = json.loads(f.read())
except Exception as e:
print(f"File couldn't parsed as JSON: {filename}")
print(e)
print(f"Number of queries read from file {len(queries)}")
return queries
def calculateCollections(start_time, end_time, collection_prefix):
month_list = set([(start_time + timedelta(_)).strftime(r"%Y-%m")
for _ in range((end_time - start_time).days)])
month_list.add(start_time.strftime(r"%Y-%m"))
month_list.add(end_time.strftime(r"%Y-%m"))
return [collection_prefix + "_" + x for x in month_list]
def querySolr(query_name, solr, collections, query={"q": "*:*"}, start_time=None, end_time=None, rows=1000):
query_start_ts = None
query_end_ts = None
props = {
"rows": str(rows),
"cursorMark": "*",
"sort": "id desc"
}
# Get start and end time as timestamp if provided
if start_time is not None and end_time is not None:
query_start_ts = int(datetime.timestamp(start_time))
query_end_ts = int(datetime.timestamp(end_time))
for k, v in query.items():
props[k] = v.format(query_start_ts=query_start_ts,
query_end_ts=query_end_ts)
print(f"{query_name}: Query: {json.dumps(props, ensure_ascii=False).replace(NL, '')}")
print(f"{query_name}: Collections: {','.join(collections)}")
exec_start = time.time()
query_start = exec_start
results = solr.search(**props)
print(f"{query_name}: Total number of documents {results.hits}")
prev_cursor_mark = None
while results.nextCursorMark is not None and not results.nextCursorMark == prev_cursor_mark:
count = len(results.docs)
# time calculations
query_time = time.time() - query_start
exec_time = str(time.time() - exec_start)
# update cursormark
prev_cursor_mark = props["cursorMark"]
props["cursorMark"] = results.nextCursorMark
# start next solr query
query_start = time.time()
results = solr.search(**props)
# batch report
if ENABLE_BATCH_REPORT:
print(f"{query_name}: This batch gathered {count} items in {query_time} seconds. Time elapsed {exec_time} seconds.")
# total report
print(f"{query_name}: Gathered in total {results.hits} items in {exec_time} seconds.")
class SolrRunThread (threading.Thread):
def __init__(self, semaphore, query_name, solr, collections, query, start_time, end_time, rows):
threading.Thread.__init__(self)
self.semaphore = semaphore
self.query_name = query_name
self.solr = solr
self.collections = collections
self.query = query
self.start_time = start_time
self.end_time = end_time
self.rows = rows
def run(self):
self.semaphore.acquire()
try:
querySolr(self.query_name, self.solr, self.collections,
self.query, self.start_time, self.end_time, self.rows)
finally:
self.semaphore.release()
def main():
# Initialize threads
semaphore = threading.Semaphore(NUM_THREADS)
# Initialize Solr client
collections = calculateCollections(START_TIME, END_TIME, COLLECTION_PREFIX)
solr = pysolr.Solr(SOLR_HOST + "/solr/" + ",".join(collections))
# Get queries from file
queries = readQueryFile(QUERY_FILE)
tasks = []
# Run queries as tasks
for index, query in enumerate(queries):
print(f"Query #{index + 1} started")
t = SolrRunThread(semaphore, f"Query #{index + 1}", solr,
collections, query, START_TIME, END_TIME, QUERY_BATCH)
t.start()
tasks.append(t)
# Wait for all tasks to complete
for t in tasks:
t.join()
if __name__ == "__main__":
main()
[
{
"q": "timestamp_s:[{query_start_ts} TO {query_end_ts}]",
"fl": "id"
},
{
"q": "timestamp_s:[{query_start_ts} TO {query_end_ts}]",
"fl": "id,user.name,user.description"
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment