Skip to content

Instantly share code, notes, and snippets.

@pratu16x7
Created May 13, 2020 05:08
Show Gist options
  • Save pratu16x7/8d8540b153c45dcb1e985347d1786c6a to your computer and use it in GitHub Desktop.
Save pratu16x7/8d8540b153c45dcb1e985347d1786c6a to your computer and use it in GitHub Desktop.
#!/bin/sh
# pylint: skip-file
import json
from reporting.const import BREAKED_MESSAGE_TYPE
from timeit import default_timer as timer
# from mogambo.lib.teja.utils import execute_es_query
from mogambo.lib.teja.utils import execute_es_request
from api.lib.elastic_search import bulk_insert
# from haptik_api.settings.base import ELASTIC_ANALYTICS_MESSAGE_INDEX, ELASTIC_ANALYTICS_MESSAGE_TYPE
from mogambo.lib.teja.constants import KINESIS_ES_URL
# , ELASTIC_MOGAMBO_ANALYTICS_AUTH, ELASTIC_MOGAMBO_ANALYTICS_USER,
# ELASTIC_MOGAMBO_ANALYTICS_PASSWORD, ELASTIC_MOGAMBO_ANALYTICS_HOST,
# ELASTIC_MOGAMBO_ANALYTICS_PORT)
BASE_INDEX = "base-node-states-data"
PROCESSED_INDEX = "preprocessed-nodes-data"
BUSINESS_ID = 1594
ES_BASE_START_TS = "2020-04-23T00:00:00.000Z"
ES_BASE_END_TS = "2020-04-25T23:59:59.000Z"
KINESIS_MESSAGE_INDEX = "kinesis-message-2020.04.24"
SCROLL_SIZE = 10000
START_NODE_STATE_TYPE = 'start'
END_NODE_STATE_TYPE = 'end'
SESSION_LAST_NODE_STATE_TYPE = 'cmplt'
BOT_BREAK_NODE_STATE_TYPE = 'break'
# TODO: constants for stat names, used thrice for every row over and over
QC_QUERIES = {
"q1": {
"business_id": 1594,
"query_id": "q1",
"start": ["overhaul::phase_2::ACCOUNT_REL_INFO"],
"end": ["overhaul::phase_2::ACCOUNT_DATA_USAGE"],
"mandatory_to_be_collected": True
},
"q2": {
"business_id": 1594,
"query_id": "q2",
"start": ["overhaul::phase_2::SLOW_SPEED_JIO"],
"end": ["overhaul::phase_2::MULTIPLE_DEVICES"],
"mandatory_to_be_collected": True
},
"q3": {
"business_id": 1594,
"query_id": "q3",
"start": ["overhaul::phase_2::UNABLE_TO_OPEN_APP"],
"end": ["overhaul::phase_2::ACTIVE_PLAN", "overhaul::phase_2::ACCESS_TO_APPLICATION"],
"mandatory_to_be_collected": True
},
"q4": {
"business_id": 1594,
"query_id": "q4",
"start": ["overhaul::phase_2::UNABLE_TO_CONNECT", "overhaul::phase_2::GREEN_LED"],
"end": ["overhaul::phase_2::ISSUE_WIFI"],
"mandatory_to_be_collected": True
},
}
####################################################################
####################################################################
#######
# execute_es_query is quite good; it has scroll and results built-in
# BUT it also has some customizations for making the index name with dates,
# Have started the fix in the PR though, but for now using execute_es_request directly
#######
def get_base_data(business_id, start_ts, end_ts, scroll_id=None):
# es_data = execute_es_query(
# query=query,
# index=ELASTIC_ANALYTICS_MESSAGE_INDEX,
# doc_type=ELASTIC_ANALYTICS_MESSAGE_TYPE,
# date_text='',
# filter_path='',
# start_timestamp=start_ts,
# end_timestamp=end_ts
# )
# prepared_base_data = get_prepared_base_data(es_hits)
# return prepared_base_data
if not scroll_id:
query = {
"query": { "match": {"data.business_id": business_id} },
"sort":[ {"data.created_at_with_seconds": {"order" : "asc"}} ],
"size": SCROLL_SIZE
}
url = KINESIS_ES_URL + KINESIS_MESSAGE_INDEX + "/_search" + '?scroll=2m'
else:
query = {
"scroll": "2m",
"scroll_id": scroll_id
}
url = KINESIS_ES_URL + "_search/scroll"
# perform request
es_result = execute_es_request(url, query)
es_result = json.loads(es_result.content)
es_hits = [hit["_source"]["data"] for hit in es_result["hits"]["hits"]]
if "_scroll_id" in es_result:
scroll_id = es_result["_scroll_id"]
prepared_base_data = get_prepared_base_data(es_hits)
return prepared_base_data, scroll_id
def get_index_count(index):
query = {
"query": {
"match_all": {}
}
}
es_result = execute_es_request(KINESIS_ES_URL + index + "/_count", query)
es_result = json.loads(es_result.content)
return es_result["count"]
def get_filtered_data(index, qc_queries, scroll_id=None):
# result = execute_es_query(
# query=es_query_for_filtered_data,
# index=index,
# doc_type="_doc",
# date_text='',
# filter_path=''
# )
# return result['hits'], result['total'], result['aggregations']
if not scroll_id:
query = get_es_query_for_filtered_data(qc_queries)
query["size"] = SCROLL_SIZE
url = KINESIS_ES_URL + index + "/_search" + '?scroll=2m'
else:
query = {
"scroll": "2m",
"scroll_id": scroll_id
}
url = KINESIS_ES_URL + "_search/scroll"
# perform request
es_result = execute_es_request(url, query)
es_result = json.loads(es_result.content)
data = [hit["_source"] for hit in es_result["hits"]["hits"]]
if "_scroll_id" in es_result:
scroll_id = es_result["_scroll_id"]
return data, scroll_id
def bulk_insert_data(data, index, doc_type):
for datum in data:
datum["_index"] = index
datum["_type"] = doc_type
bulk_insert(data)
########################################################################
########################################################################
def get_session_wise_query_stats(filtered_data, qc_queries):
"""
From
{"session_id": 2, "node_name": "C", "timestamp": 107, "prev_node_name": "C", "bot_break": false, "session_last_node": false, "missing_entities": [], "collected_entities": ["otp"]}
To (TS=timestamp)
# TS Sess q_id prev_q_id node_state_type prev_node_state_type base_prev_node_name
# 5, 1, 1 start,
# 11, 1, 1 end, start
# 13, 1, 1 end, end
# 14, 1, 1 end, end
# 14, 1, 1 session_last, end
# 18, 2, 1 start,
# 21, 2, 1 1 start, start
# 22, 2, 1 1 start, start
# 25, 2, 1 session_last, start
# 28, 2, 1 start,
# 31, 2, 2 1 start, start
# 31, 2, 2 2 start, start
# 37, 2, 2 break, start
# 31, 2, 4 start, start
# 37, 2, 4 4 start, start
base_prev_node_name is the actual `prev_node_name` retained from the base data,
for noting it down as the `drop off` node.
"""
session_wise_query_stats = []
intermediate_data = []
prev_session_id = ""
current_session_id = ""
queries_in_current_session = {}
prev_query_id = ""
prev_node_state_type = ""
for row in filtered_data:
current_session_id = row["session_id"]
node_name = row["node_name"]
node_state_type, query_id = get_node_state_type_and_query_id(row, qc_queries, prev_query_id)
if node_state_type:
# print(start_node_to_query_map)
# Check if end of session: empty current_session objects, update curr_session
if current_session_id != prev_session_id:
for key, value in queries_in_current_session.items():
session_wise_query_stats.append(value)
queries_in_current_session = {}
prev_query_id = ""
prev_node_state_type = ""
# So this will also input any queries that have simply been hit at the end without starting
# but no stats will be added to these records
if query_id not in queries_in_current_session:
queries_in_current_session[query_id] = {
"session_id": current_session_id,
"query_id": query_id,
"initiated": 0, # ... total
"completed": 0, # ... reached end node
"abandoned": 0, # ... nothing special hit after starting
"bot_break": 0, # ... bot break occured
"restarted": 0, # ... instances of same query started again (not including self connections)
"diverted": 0, # ... different query started
"drop_offs": []
}
base_prev_node_name = row["prev_node_name"]
# initiate a query of its query_id if node is start
# check is already present, in case we have to add
# initiated: q_id = 1, node_state_type = `start`
# restarted: q_id = 1, node_state_type = `start`, prev_q_id = 1, prev_node_state_type = `start`
# diverted: q_id != 1, node_state_type = `start`, prev_q_id = 1, prev_node_state_type = `start`
if node_state_type == START_NODE_STATE_TYPE:
query_id_to_update = query_id
query_to_update = queries_in_current_session[query_id_to_update]
# No brainer: this is an instance of initiation
query_to_update["initiated"] += 1
if prev_node_state_type and prev_node_state_type == START_NODE_STATE_TYPE and query_id == prev_query_id:
query_to_update["restarted"] += 1
query_to_update['drop_offs'].append(
{
"query_id": query_id_to_update,
"session_id": current_session_id,
"drop_off_node": base_prev_node_name,
"reason": "restarted",
},
)
elif prev_node_state_type and prev_node_state_type == START_NODE_STATE_TYPE and query_id != prev_query_id:
query_to_update["diverted"] += 1
query_that_was_going_on = queries_in_current_session[prev_query_id]
query_that_was_going_on['drop_offs'].append(
{
"query_id": query_id_to_update,
"session_id": current_session_id,
"drop_off_node": base_prev_node_name,
"reason": "diverted",
"diverting_query_id": query_id
},
)
# add a completed query of its query_id if node is start
# completed: q_id = 1, node_state_type = `end`, prev_q_id = 1, prev_node_state_type = `start`
# TODO: nested queries
# TODO: multiple queries can get completed with the same end node
if node_state_type == END_NODE_STATE_TYPE:
if prev_node_state_type and prev_node_state_type == START_NODE_STATE_TYPE and query_id == prev_query_id:
# TODO: not just prev_query_id, can be current query too if current node=start of some q2=end of this q1
query_id_to_update = query_id
query_to_update = queries_in_current_session[query_id_to_update]
query_to_update["completed"] += 1
elif query_id != prev_query_id:
# TODO
pass
# nothing special hit after starting:
# prev_q_id = 1, node_state_type = `session_last`, prev_node_state_type = `start`
if node_state_type == SESSION_LAST_NODE_STATE_TYPE:
if prev_node_state_type and prev_node_state_type == START_NODE_STATE_TYPE:
# TODO: not just prev_query_id, can be current query too if current node=start of some q2=session_last
# i.e. query_id of this node comes into play then: similar to above query_id == prev_query_id
query_id_to_update = prev_query_id
query_to_update = queries_in_current_session[query_id_to_update]
query_to_update["abandoned"] += 1
query_to_update['drop_offs'].append(
{
"query_id": query_id_to_update,
"session_id": current_session_id,
"drop_off_node": base_prev_node_name,
"reason": "abandoned",
},
)
# bot break occured:
# prev_q_id = 1, node_state_type = `break`, prev_node_state_type = `start`
if node_state_type == BOT_BREAK_NODE_STATE_TYPE:
if prev_node_state_type and prev_node_state_type == START_NODE_STATE_TYPE: # Not end type or break type
# TODO: not just prev_query_id, can be current query too if current node=start of some q2=break
# i.e. query_id of this node comes into play then: similar to above query_id == prev_query_id
query_id_to_update = prev_query_id
query_to_update = queries_in_current_session[query_id_to_update]
query_to_update["bot_break"] += 1
query_to_update['drop_offs'].append(
{
"query_id": query_id_to_update,
"session_id": current_session_id,
"drop_off_node": base_prev_node_name,
"reason": "bot_break",
},
)
# This has no purpose
processed_row = {
"timestamp": row["timestamp"],
"session_id": current_session_id,
"node_state_type": node_state_type,
"prev_node_state_type": prev_node_state_type,
"query_id": query_id,
"prev_query_id": prev_query_id,
"node_name": node_name,
"base_prev_node_name": base_prev_node_name # ... for drop-off node
}
prev_node_state_type = node_state_type
if query_id:
prev_query_id = query_id
intermediate_data.append(processed_row)
prev_session_id = current_session_id
for key, value in queries_in_current_session.items():
session_wise_query_stats.append(value)
return session_wise_query_stats, intermediate_data
def get_node_state_type_and_query_id(row, qc_queries, prev_query_id):
node_state_type = ""
query_id = ""
start_node_to_query_map, end_node_to_query_map = get_start_and_end_node_to_query_maps(qc_queries)
# print(start_node_to_query_map)
node_name = row["node_name"]
if node_name in start_node_to_query_map:
node_state_type = START_NODE_STATE_TYPE
query_id = start_node_to_query_map[node_name]
# TODO: get only mandatory entities completed ones, or not
# TODO: verify: if a node is both start and end, OR EVEN ALL START AND BREAK AND LAST, start is considered
elif node_name in end_node_to_query_map:
node_state_type = END_NODE_STATE_TYPE
query_ids = end_node_to_query_map[node_name]
if prev_query_id in query_ids:
query_id = prev_query_id
# TODO: These nodes can also be start or end nodes
elif row["bot_break"]:
node_state_type = BOT_BREAK_NODE_STATE_TYPE
elif row["session_last_node"]:
node_state_type = SESSION_LAST_NODE_STATE_TYPE
return node_state_type, query_id
def get_start_and_end_node_to_query_maps(qc_queries):
"""
q_id is only tracked for start nodes, and prev_q_id gets carried over until a new query is hit
Not to save state, but because END NODES, or any node other than start node can belong to multiple queries.
So because q_id is only needed in case of start nodes anyway, we don't track it for others, and prev_id can just be carried over.
"""
start_node_to_query_map = {}
end_node_to_query_map = {}
for q_id, qc_query in qc_queries.items():
for node_name in qc_query["start"]:
start_node_to_query_map[node_name] = q_id # start nodes, they'll all be distinct
for node_name in qc_query["end"]:
if node_name in end_node_to_query_map:
end_node_to_query_map[node_name].append(q_id)
else:
end_node_to_query_map[node_name] = [q_id]
return start_node_to_query_map, end_node_to_query_map
def get_prepared_base_data(hits):
"""
In order to get these properties:
{
"session_id": 2,
"node_name": "C",
"timestamp": 107,
"prev_node_name": "C",
"bot_break": false,
"session_last_node": false,
"missing_entities": [],
"collected_entities": ["otp"]
}
"""
prepped_data = []
prev_session_id = ""
for hit in hits:
detected_nodes = hit["nodes"]
nodes_chain = hit["node_chain"].split("#") # TODO: ... is this correct?
message_cache = json.loads(hit["message_cache"])
stop_logic_data = hit["stop_logic_data"] # ... OR hit["stop_flag"]
session_id = hit["conversation_identifier"] # OR hit["conversation_no"]
prepped_doc = {
# "business_id": hit["business_id"],
"timestamp": hit["created_at_with_seconds"], #OR hit["created_at"]
"session_id": session_id,
"node_name": detected_nodes[0] if detected_nodes else nodes_chain[-1],
"prev_node_name": nodes_chain[-2] if len(nodes_chain) > 1 else "",
"session_last_node": False,
"bot_break": stop_logic_data in BREAKED_MESSAGE_TYPE
# # TODO: Get mandatory entities info
# missing_entities: [entity for entity in message_cache["entities"].keys() if entity_value not in entity]
# collected_entities: [entity for entity in message_cache["entities"].keys() if entity_value in entity]
}
if prepped_data:
prepped_data[-1]["session_last_node"] = True if session_id != prev_session_id else False
prev_session_id = session_id
prepped_data.append(prepped_doc)
return prepped_data
def get_es_query_for_filtered_data(qc_queries):
"""
Example QC Query: start[B, F] => end[S, T, O]
ES Query:
query = {
"query": {
"bool": {
"should": [
{
"bool": {
"should": [
{"match": {"node_name": "B"}},
{"match": {"node_name": "F"}}
],
"must_not": [
{"match": {"prev_node_name": "B"}},
{"match": {"prev_node_name": "F"}}
]
}
},
{
"bool": {
"should": [
{"match": {"node_name": "S"}},
{"match": {"node_name": "T"}},
{"match": {"node_name": "O"}}
]
}
}
]
}
},
"sort": [
{ "timestamp": {"order" : "asc"}}
]
}
"""
query = {
"query": {
"bool": {
"should": get_all_should_conditions(qc_queries)
}
},
"sort": [
{ "timestamp": {"order" : "asc"}}
]
}
return query
def get_all_should_conditions(qc_queries):
all_should_conditions = []
end_node_matchers = []
for name, qc_query in qc_queries.items():
start_node_shoulds, start_node_must_nots = [], []
for node_name in qc_query["start"]:
# TODO: Case where query is consequentlt repeated, but with a different start node
start_node_shoulds.append({"match": {"node_name": node_name}})
start_node_must_nots.append({"match": {"prev_node_name": node_name}})
all_should_conditions.append({
"bool": {
"should": start_node_shoulds,
"must_not": start_node_must_nots
}
})
for node_name in qc_query["end"]:
end_node_matchers.append({"match": {"node_name": node_name}})
# TODO: take into consideration qc_query["mandatory_to_be_collected"]
# for matching 'collected_entities' in base data
all_should_conditions += [
{
"bool": {
"should": {"match": {"session_last_node": True}}
}
},
{
"bool": {
"should": {"match": {"bot_break": True}}
}
},
{
"bool": {
"should": end_node_matchers
}
}
]
return all_should_conditions
################################################################
################################################################
def print_as_rows(data, node_name_keys, lastness_key, lastness_value):
NODE_NAME_SLICE = slice(19,30)
TS_SLICE = slice(6, -1)
print(" ".join([key[:10] for key in data[0].keys()]))
for datum in data:
line_to_print = ""
session_last_node = False
for key, value in datum.items():
if key == "timestamp":
value = value[TS_SLICE]
elif key in node_name_keys:
if value:
value = value[NODE_NAME_SLICE]
if len(value) < 10:
value += '_' * (10 - len(value))
else:
value = "_" * 10
line_to_print += str(value) + '\t'
if key == lastness_key and value == lastness_value:
session_last_node = True
print(line_to_print)
if session_last_node:
print('\n')
def main(task):
if task == "prep_base_data":
scroll_id = None
while True:
prepped_data, scroll_id = get_base_data(BUSINESS_ID, ES_BASE_START_TS, ES_BASE_END_TS, scroll_id)
if not prepped_data:
break
# print_as_rows(prepped_data[1500:1800], ["node_name", "prev_node_name"], "session_last_node", True)
print(len(prepped_data))
bulk_insert_data(prepped_data, BASE_INDEX, '_doc')
# Actual task, BENCHMARK this
elif task == "make_session_wise_query_stats":
base_index_count = get_index_count(BASE_INDEX)
for i in range(len(QC_QUERIES)):
qc_queries = {}
for item in list(QC_QUERIES.values())[i:]:
qc_queries[item["query_id"]] = item
start = timer()
total_records_count = 0
scroll_id = None
while True:
filtered_data, scroll_id = get_filtered_data(BASE_INDEX, qc_queries, scroll_id)
if not filtered_data:
break
print(f'{len(filtered_data)} records filtered from base')
session_wise_query_stats, intermediate_data = get_session_wise_query_stats(filtered_data, qc_queries)
total_records_count += len(session_wise_query_stats)
# print(session_wise_query_stats)
# 3100 0.385 sec
bulk_insert_data(session_wise_query_stats, PROCESSED_INDEX, '_doc')
end = timer()
print(
f'{base_index_count} base records, ',
f'{len(qc_queries)} queries, ',
f'{total_records_count} result records, ',
f'{str(round(end - start, 3))} sec'
)
print("-" * 50)
# elif task == "preprocess_data":
# data, total, aggregations = get_filtered_data(BASE_INDEX, QC_QUERIES)
# processed_data = preprocess_data(data, QC_QUERIES)
# print_as_rows(processed_data[500:800], ["node_name","base_prev_node_name"], "node_type", "cmplt")
elif task == "get_sample_data":
pass
if __name__ == "__main__":
# import argparse
# parser = argparse.ArgumentParser(description='Create a ArcHydro schema')
# parser.add_argument('--task', metavar='task', help='whaddya wanna do?')
# args = parser.parse_args()
main("make_session_wise_query_stats")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment