Skip to content

Instantly share code, notes, and snippets.

@davidkyle
Created July 21, 2020 13:39
Show Gist options
  • Save davidkyle/343bfca858d1986f89c1a1b2bb0f64a4 to your computer and use it in GitHub Desktop.
Save davidkyle/343bfca858d1986f89c1a1b2bb0f64a4 to your computer and use it in GitHub Desktop.
Script for gathering timing stats for an inference/not inference aggregation
{
"size": 1,
"aggs": {
"session": {
"composite": {
"size": 10,
"sources": [
{
"session": {
"terms": {
"field": "user_session"
}
}
}
]
},
"aggs": {
"metrics_count": {
"value_count": {
"field": "user_session"
}
},
"metrics_product_id_cardinality": {
"cardinality": {
"field": "product_id"
}
},
"metrics_category_id_cardinality": {
"cardinality": {
"field": "category_id"
}
},
"metrics_category_code_cardinality": {
"cardinality": {
"field": "category_code"
}
},
"metrics_brand_cardinality": {
"cardinality": {
"field": "brand"
}
},
"metrics_price_min": {
"min": {
"field": "price"
}
},
"metrics_price_max": {
"max": {
"field": "price"
}
},
"metrics_event_type_view": {
"filter": {
"terms": {
"event_type": [
"view"
]
}
},
"aggregations": {
"view_count": {
"value_count": {
"field": "user_session"
}
}
}
},
"metrics_event_type_cart": {
"filter": {
"terms": {
"event_type": [
"cart"
]
}
},
"aggregations": {
"cart_count": {
"value_count": {
"field": "user_session"
}
}
}
}
}
}
}
}
{
"size": 1,
"aggs": {
"session": {
"composite": {
"size": 10,
"sources": [
{
"session": {
"terms": {
"field": "user_session"
}
}
}
]
},
"aggs": {
"metrics_count": {
"value_count": {
"field": "user_session"
}
},
"metrics_product_id_cardinality": {
"cardinality": {
"field": "product_id"
}
},
"metrics_category_id_cardinality": {
"cardinality": {
"field": "category_id"
}
},
"metrics_category_code_cardinality": {
"cardinality": {
"field": "category_code"
}
},
"metrics_brand_cardinality": {
"cardinality": {
"field": "brand"
}
},
"metrics_price_min": {
"min": {
"field": "price"
}
},
"metrics_price_max": {
"max": {
"field": "price"
}
},
"metrics_event_type_view": {
"filter": {
"terms": {
"event_type": [
"view"
]
}
},
"aggregations": {
"view_count": {
"value_count": {
"field": "user_session"
}
}
}
},
"metrics_event_type_cart": {
"filter": {
"terms": {
"event_type": [
"cart"
]
}
},
"aggregations": {
"cart_count": {
"value_count": {
"field": "user_session"
}
}
}
},
"regression_agg": {
"inference": {
"model_id": "sessions-regression-1595229080285",
"buckets_path" : {
"metrics.brand.cardinality": "metrics_brand_cardinality",
"metrics.category_code.cardinality": "metrics_category_code_cardinality",
"metrics.category_id.cardinality": "metrics_category_id_cardinality",
"metrics.count": "metrics_count",
"metrics.event_type.cart.cart.count": "metrics_event_type_cart.cart_count",
"metrics.event_type.view.view.count": "metrics_event_type_view.view_count",
"metrics.price.max": "metrics_price_max",
"metrics.price.min": "metrics_price_min",
"metrics.product_id.cardinality": "metrics_product_id_cardinality"
}
}
}
}
}
}
}
import argparse
import random
import json
import statistics
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import RequestError
from multiprocessing import Process
from time import sleep
DEFAULT_INDEX = 'events*'
MAX_SLEEP_S = 10
def load_json(filename):
with open(filename, 'r') as f:
return json.load(f)
def get_times(url, index, query, queryname):
num_queries = 10
times = []
es = Elasticsearch(url)
for i in range(num_queries):
print("timing")
try:
res = es.search(index=index, body=query)
times.append(res['took'])
except RequestError as e:
print(f"Error! {e}")
break
sleep(0.05)
print("min,max,mean,median")
print("{}, {}, {}, {}".format(min(times), max(times), statistics.mean(times), statistics.median(times)))
def warmup(id, url, index):
def pp(message):
print(f"[{id}] {message}")
query = load_json('events-agg.json') if id == 0 else load_json('events-inference-agg.json')
es = Elasticsearch(url)
for i in range(50):
pp("Searching")
try:
res = es.search(index=index, body=query)
except RequestError as e:
pp(f"Error! {e}")
sleep(0.05)
def do_warm(url, index,):
num_procs = 2
procs = [Process(target=warmup, args=(id, url, index)) for id in range(num_procs)]
[p.start() for p in procs]
[p.join() for p in procs]
def main():
parser = argparse.ArgumentParser(prog='warmup')
parser.add_argument('--url', nargs='+', required=True, help="A connection URL, e.g. http://user:secret@localhost:9200")
parser.add_argument('--index', default=DEFAULT_INDEX, help=f"Index to query. Default: {DEFAULT_INDEX}")
args = parser.parse_args()
do_warm(args.url, args.index)
get_times(args.url, args.index, load_json('events-agg.json'), "plain agg")
get_times(args.url, args.index, load_json('events-inference-agg.json'), "inference agg")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment