Created
May 13, 2020 05:08
-
-
Save pratu16x7/8d8540b153c45dcb1e985347d1786c6a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
# pylint: skip-file | |
import json | |
from reporting.const import BREAKED_MESSAGE_TYPE | |
from timeit import default_timer as timer | |
# from mogambo.lib.teja.utils import execute_es_query | |
from mogambo.lib.teja.utils import execute_es_request | |
from api.lib.elastic_search import bulk_insert | |
# from haptik_api.settings.base import ELASTIC_ANALYTICS_MESSAGE_INDEX, ELASTIC_ANALYTICS_MESSAGE_TYPE | |
from mogambo.lib.teja.constants import KINESIS_ES_URL | |
# , ELASTIC_MOGAMBO_ANALYTICS_AUTH, ELASTIC_MOGAMBO_ANALYTICS_USER, | |
# ELASTIC_MOGAMBO_ANALYTICS_PASSWORD, ELASTIC_MOGAMBO_ANALYTICS_HOST, | |
# ELASTIC_MOGAMBO_ANALYTICS_PORT) | |
BASE_INDEX = "base-node-states-data" | |
PROCESSED_INDEX = "preprocessed-nodes-data" | |
BUSINESS_ID = 1594 | |
ES_BASE_START_TS = "2020-04-23T00:00:00.000Z" | |
ES_BASE_END_TS = "2020-04-25T23:59:59.000Z" | |
KINESIS_MESSAGE_INDEX = "kinesis-message-2020.04.24" | |
SCROLL_SIZE = 10000 | |
START_NODE_STATE_TYPE = 'start' | |
END_NODE_STATE_TYPE = 'end' | |
SESSION_LAST_NODE_STATE_TYPE = 'cmplt' | |
BOT_BREAK_NODE_STATE_TYPE = 'break' | |
# TODO: constants for stat names, used thrice for every row over and over | |
QC_QUERIES = { | |
"q1": { | |
"business_id": 1594, | |
"query_id": "q1", | |
"start": ["overhaul::phase_2::ACCOUNT_REL_INFO"], | |
"end": ["overhaul::phase_2::ACCOUNT_DATA_USAGE"], | |
"mandatory_to_be_collected": True | |
}, | |
"q2": { | |
"business_id": 1594, | |
"query_id": "q2", | |
"start": ["overhaul::phase_2::SLOW_SPEED_JIO"], | |
"end": ["overhaul::phase_2::MULTIPLE_DEVICES"], | |
"mandatory_to_be_collected": True | |
}, | |
"q3": { | |
"business_id": 1594, | |
"query_id": "q3", | |
"start": ["overhaul::phase_2::UNABLE_TO_OPEN_APP"], | |
"end": ["overhaul::phase_2::ACTIVE_PLAN", "overhaul::phase_2::ACCESS_TO_APPLICATION"], | |
"mandatory_to_be_collected": True | |
}, | |
"q4": { | |
"business_id": 1594, | |
"query_id": "q4", | |
"start": ["overhaul::phase_2::UNABLE_TO_CONNECT", "overhaul::phase_2::GREEN_LED"], | |
"end": ["overhaul::phase_2::ISSUE_WIFI"], | |
"mandatory_to_be_collected": True | |
}, | |
} | |
#################################################################### | |
#################################################################### | |
####### | |
# execute_es_query is quite good; it has scroll and results built-in | |
# BUT it also has some customizations for making the index name with dates, | |
# Have started the fix in the PR though, but for now using execute_es_request directly | |
####### | |
def get_base_data(business_id, start_ts, end_ts, scroll_id=None): | |
# es_data = execute_es_query( | |
# query=query, | |
# index=ELASTIC_ANALYTICS_MESSAGE_INDEX, | |
# doc_type=ELASTIC_ANALYTICS_MESSAGE_TYPE, | |
# date_text='', | |
# filter_path='', | |
# start_timestamp=start_ts, | |
# end_timestamp=end_ts | |
# ) | |
# prepared_base_data = get_prepared_base_data(es_hits) | |
# return prepared_base_data | |
if not scroll_id: | |
query = { | |
"query": { "match": {"data.business_id": business_id} }, | |
"sort":[ {"data.created_at_with_seconds": {"order" : "asc"}} ], | |
"size": SCROLL_SIZE | |
} | |
url = KINESIS_ES_URL + KINESIS_MESSAGE_INDEX + "/_search" + '?scroll=2m' | |
else: | |
query = { | |
"scroll": "2m", | |
"scroll_id": scroll_id | |
} | |
url = KINESIS_ES_URL + "_search/scroll" | |
# perform request | |
es_result = execute_es_request(url, query) | |
es_result = json.loads(es_result.content) | |
es_hits = [hit["_source"]["data"] for hit in es_result["hits"]["hits"]] | |
if "_scroll_id" in es_result: | |
scroll_id = es_result["_scroll_id"] | |
prepared_base_data = get_prepared_base_data(es_hits) | |
return prepared_base_data, scroll_id | |
def get_index_count(index): | |
query = { | |
"query": { | |
"match_all": {} | |
} | |
} | |
es_result = execute_es_request(KINESIS_ES_URL + index + "/_count", query) | |
es_result = json.loads(es_result.content) | |
return es_result["count"] | |
def get_filtered_data(index, qc_queries, scroll_id=None): | |
# result = execute_es_query( | |
# query=es_query_for_filtered_data, | |
# index=index, | |
# doc_type="_doc", | |
# date_text='', | |
# filter_path='' | |
# ) | |
# return result['hits'], result['total'], result['aggregations'] | |
if not scroll_id: | |
query = get_es_query_for_filtered_data(qc_queries) | |
query["size"] = SCROLL_SIZE | |
url = KINESIS_ES_URL + index + "/_search" + '?scroll=2m' | |
else: | |
query = { | |
"scroll": "2m", | |
"scroll_id": scroll_id | |
} | |
url = KINESIS_ES_URL + "_search/scroll" | |
# perform request | |
es_result = execute_es_request(url, query) | |
es_result = json.loads(es_result.content) | |
data = [hit["_source"] for hit in es_result["hits"]["hits"]] | |
if "_scroll_id" in es_result: | |
scroll_id = es_result["_scroll_id"] | |
return data, scroll_id | |
def bulk_insert_data(data, index, doc_type): | |
for datum in data: | |
datum["_index"] = index | |
datum["_type"] = doc_type | |
bulk_insert(data) | |
######################################################################## | |
######################################################################## | |
def get_session_wise_query_stats(filtered_data, qc_queries): | |
""" | |
From | |
{"session_id": 2, "node_name": "C", "timestamp": 107, "prev_node_name": "C", "bot_break": false, "session_last_node": false, "missing_entities": [], "collected_entities": ["otp"]} | |
To (TS=timestamp) | |
# TS Sess q_id prev_q_id node_state_type prev_node_state_type base_prev_node_name | |
# 5, 1, 1 start, | |
# 11, 1, 1 end, start | |
# 13, 1, 1 end, end | |
# 14, 1, 1 end, end | |
# 14, 1, 1 session_last, end | |
# 18, 2, 1 start, | |
# 21, 2, 1 1 start, start | |
# 22, 2, 1 1 start, start | |
# 25, 2, 1 session_last, start | |
# 28, 2, 1 start, | |
# 31, 2, 2 1 start, start | |
# 31, 2, 2 2 start, start | |
# 37, 2, 2 break, start | |
# 31, 2, 4 start, start | |
# 37, 2, 4 4 start, start | |
base_prev_node_name is the actual `prev_node_name` retained from the base data, | |
for noting it down as the `drop off` node. | |
""" | |
session_wise_query_stats = [] | |
intermediate_data = [] | |
prev_session_id = "" | |
current_session_id = "" | |
queries_in_current_session = {} | |
prev_query_id = "" | |
prev_node_state_type = "" | |
for row in filtered_data: | |
current_session_id = row["session_id"] | |
node_name = row["node_name"] | |
node_state_type, query_id = get_node_state_type_and_query_id(row, qc_queries, prev_query_id) | |
if node_state_type: | |
# print(start_node_to_query_map) | |
# Check if end of session: empty current_session objects, update curr_session | |
if current_session_id != prev_session_id: | |
for key, value in queries_in_current_session.items(): | |
session_wise_query_stats.append(value) | |
queries_in_current_session = {} | |
prev_query_id = "" | |
prev_node_state_type = "" | |
# So this will also input any queries that have simply been hit at the end without starting | |
# but no stats will be added to these records | |
if query_id not in queries_in_current_session: | |
queries_in_current_session[query_id] = { | |
"session_id": current_session_id, | |
"query_id": query_id, | |
"initiated": 0, # ... total | |
"completed": 0, # ... reached end node | |
"abandoned": 0, # ... nothing special hit after starting | |
"bot_break": 0, # ... bot break occured | |
"restarted": 0, # ... instances of same query started again (not including self connections) | |
"diverted": 0, # ... different query started | |
"drop_offs": [] | |
} | |
base_prev_node_name = row["prev_node_name"] | |
# initiate a query of its query_id if node is start | |
# check is already present, in case we have to add | |
# initiated: q_id = 1, node_state_type = `start` | |
# restarted: q_id = 1, node_state_type = `start`, prev_q_id = 1, prev_node_state_type = `start` | |
# diverted: q_id != 1, node_state_type = `start`, prev_q_id = 1, prev_node_state_type = `start` | |
if node_state_type == START_NODE_STATE_TYPE: | |
query_id_to_update = query_id | |
query_to_update = queries_in_current_session[query_id_to_update] | |
# No brainer: this is an instance of initiation | |
query_to_update["initiated"] += 1 | |
if prev_node_state_type and prev_node_state_type == START_NODE_STATE_TYPE and query_id == prev_query_id: | |
query_to_update["restarted"] += 1 | |
query_to_update['drop_offs'].append( | |
{ | |
"query_id": query_id_to_update, | |
"session_id": current_session_id, | |
"drop_off_node": base_prev_node_name, | |
"reason": "restarted", | |
}, | |
) | |
elif prev_node_state_type and prev_node_state_type == START_NODE_STATE_TYPE and query_id != prev_query_id: | |
query_to_update["diverted"] += 1 | |
query_that_was_going_on = queries_in_current_session[prev_query_id] | |
query_that_was_going_on['drop_offs'].append( | |
{ | |
"query_id": query_id_to_update, | |
"session_id": current_session_id, | |
"drop_off_node": base_prev_node_name, | |
"reason": "diverted", | |
"diverting_query_id": query_id | |
}, | |
) | |
# add a completed query of its query_id if node is start | |
# completed: q_id = 1, node_state_type = `end`, prev_q_id = 1, prev_node_state_type = `start` | |
# TODO: nested queries | |
# TODO: multiple queries can get completed with the same end node | |
if node_state_type == END_NODE_STATE_TYPE: | |
if prev_node_state_type and prev_node_state_type == START_NODE_STATE_TYPE and query_id == prev_query_id: | |
# TODO: not just prev_query_id, can be current query too if current node=start of some q2=end of this q1 | |
query_id_to_update = query_id | |
query_to_update = queries_in_current_session[query_id_to_update] | |
query_to_update["completed"] += 1 | |
elif query_id != prev_query_id: | |
# TODO | |
pass | |
# nothing special hit after starting: | |
# prev_q_id = 1, node_state_type = `session_last`, prev_node_state_type = `start` | |
if node_state_type == SESSION_LAST_NODE_STATE_TYPE: | |
if prev_node_state_type and prev_node_state_type == START_NODE_STATE_TYPE: | |
# TODO: not just prev_query_id, can be current query too if current node=start of some q2=session_last | |
# i.e. query_id of this node comes into play then: similar to above query_id == prev_query_id | |
query_id_to_update = prev_query_id | |
query_to_update = queries_in_current_session[query_id_to_update] | |
query_to_update["abandoned"] += 1 | |
query_to_update['drop_offs'].append( | |
{ | |
"query_id": query_id_to_update, | |
"session_id": current_session_id, | |
"drop_off_node": base_prev_node_name, | |
"reason": "abandoned", | |
}, | |
) | |
# bot break occured: | |
# prev_q_id = 1, node_state_type = `break`, prev_node_state_type = `start` | |
if node_state_type == BOT_BREAK_NODE_STATE_TYPE: | |
if prev_node_state_type and prev_node_state_type == START_NODE_STATE_TYPE: # Not end type or break type | |
# TODO: not just prev_query_id, can be current query too if current node=start of some q2=break | |
# i.e. query_id of this node comes into play then: similar to above query_id == prev_query_id | |
query_id_to_update = prev_query_id | |
query_to_update = queries_in_current_session[query_id_to_update] | |
query_to_update["bot_break"] += 1 | |
query_to_update['drop_offs'].append( | |
{ | |
"query_id": query_id_to_update, | |
"session_id": current_session_id, | |
"drop_off_node": base_prev_node_name, | |
"reason": "bot_break", | |
}, | |
) | |
# This has no purpose | |
processed_row = { | |
"timestamp": row["timestamp"], | |
"session_id": current_session_id, | |
"node_state_type": node_state_type, | |
"prev_node_state_type": prev_node_state_type, | |
"query_id": query_id, | |
"prev_query_id": prev_query_id, | |
"node_name": node_name, | |
"base_prev_node_name": base_prev_node_name # ... for drop-off node | |
} | |
prev_node_state_type = node_state_type | |
if query_id: | |
prev_query_id = query_id | |
intermediate_data.append(processed_row) | |
prev_session_id = current_session_id | |
for key, value in queries_in_current_session.items(): | |
session_wise_query_stats.append(value) | |
return session_wise_query_stats, intermediate_data | |
def get_node_state_type_and_query_id(row, qc_queries, prev_query_id): | |
node_state_type = "" | |
query_id = "" | |
start_node_to_query_map, end_node_to_query_map = get_start_and_end_node_to_query_maps(qc_queries) | |
# print(start_node_to_query_map) | |
node_name = row["node_name"] | |
if node_name in start_node_to_query_map: | |
node_state_type = START_NODE_STATE_TYPE | |
query_id = start_node_to_query_map[node_name] | |
# TODO: get only mandatory entities completed ones, or not | |
# TODO: verify: if a node is both start and end, OR EVEN ALL START AND BREAK AND LAST, start is considered | |
elif node_name in end_node_to_query_map: | |
node_state_type = END_NODE_STATE_TYPE | |
query_ids = end_node_to_query_map[node_name] | |
if prev_query_id in query_ids: | |
query_id = prev_query_id | |
# TODO: These nodes can also be start or end nodes | |
elif row["bot_break"]: | |
node_state_type = BOT_BREAK_NODE_STATE_TYPE | |
elif row["session_last_node"]: | |
node_state_type = SESSION_LAST_NODE_STATE_TYPE | |
return node_state_type, query_id | |
def get_start_and_end_node_to_query_maps(qc_queries): | |
""" | |
q_id is only tracked for start nodes, and prev_q_id gets carried over until a new query is hit | |
Not to save state, but because END NODES, or any node other than start node can belong to multiple queries. | |
So because q_id is only needed in case of start nodes anyway, we don't track it for others, and prev_id can just be carried over. | |
""" | |
start_node_to_query_map = {} | |
end_node_to_query_map = {} | |
for q_id, qc_query in qc_queries.items(): | |
for node_name in qc_query["start"]: | |
start_node_to_query_map[node_name] = q_id # start nodes, they'll all be distinct | |
for node_name in qc_query["end"]: | |
if node_name in end_node_to_query_map: | |
end_node_to_query_map[node_name].append(q_id) | |
else: | |
end_node_to_query_map[node_name] = [q_id] | |
return start_node_to_query_map, end_node_to_query_map | |
def get_prepared_base_data(hits): | |
""" | |
In order to get these properties: | |
{ | |
"session_id": 2, | |
"node_name": "C", | |
"timestamp": 107, | |
"prev_node_name": "C", | |
"bot_break": false, | |
"session_last_node": false, | |
"missing_entities": [], | |
"collected_entities": ["otp"] | |
} | |
""" | |
prepped_data = [] | |
prev_session_id = "" | |
for hit in hits: | |
detected_nodes = hit["nodes"] | |
nodes_chain = hit["node_chain"].split("#") # TODO: ... is this correct? | |
message_cache = json.loads(hit["message_cache"]) | |
stop_logic_data = hit["stop_logic_data"] # ... OR hit["stop_flag"] | |
session_id = hit["conversation_identifier"] # OR hit["conversation_no"] | |
prepped_doc = { | |
# "business_id": hit["business_id"], | |
"timestamp": hit["created_at_with_seconds"], #OR hit["created_at"] | |
"session_id": session_id, | |
"node_name": detected_nodes[0] if detected_nodes else nodes_chain[-1], | |
"prev_node_name": nodes_chain[-2] if len(nodes_chain) > 1 else "", | |
"session_last_node": False, | |
"bot_break": stop_logic_data in BREAKED_MESSAGE_TYPE | |
# # TODO: Get mandatory entities info | |
# missing_entities: [entity for entity in message_cache["entities"].keys() if entity_value not in entity] | |
# collected_entities: [entity for entity in message_cache["entities"].keys() if entity_value in entity] | |
} | |
if prepped_data: | |
prepped_data[-1]["session_last_node"] = True if session_id != prev_session_id else False | |
prev_session_id = session_id | |
prepped_data.append(prepped_doc) | |
return prepped_data | |
def get_es_query_for_filtered_data(qc_queries): | |
""" | |
Example QC Query: start[B, F] => end[S, T, O] | |
ES Query: | |
query = { | |
"query": { | |
"bool": { | |
"should": [ | |
{ | |
"bool": { | |
"should": [ | |
{"match": {"node_name": "B"}}, | |
{"match": {"node_name": "F"}} | |
], | |
"must_not": [ | |
{"match": {"prev_node_name": "B"}}, | |
{"match": {"prev_node_name": "F"}} | |
] | |
} | |
}, | |
{ | |
"bool": { | |
"should": [ | |
{"match": {"node_name": "S"}}, | |
{"match": {"node_name": "T"}}, | |
{"match": {"node_name": "O"}} | |
] | |
} | |
} | |
] | |
} | |
}, | |
"sort": [ | |
{ "timestamp": {"order" : "asc"}} | |
] | |
} | |
""" | |
query = { | |
"query": { | |
"bool": { | |
"should": get_all_should_conditions(qc_queries) | |
} | |
}, | |
"sort": [ | |
{ "timestamp": {"order" : "asc"}} | |
] | |
} | |
return query | |
def get_all_should_conditions(qc_queries): | |
all_should_conditions = [] | |
end_node_matchers = [] | |
for name, qc_query in qc_queries.items(): | |
start_node_shoulds, start_node_must_nots = [], [] | |
for node_name in qc_query["start"]: | |
# TODO: Case where query is consequentlt repeated, but with a different start node | |
start_node_shoulds.append({"match": {"node_name": node_name}}) | |
start_node_must_nots.append({"match": {"prev_node_name": node_name}}) | |
all_should_conditions.append({ | |
"bool": { | |
"should": start_node_shoulds, | |
"must_not": start_node_must_nots | |
} | |
}) | |
for node_name in qc_query["end"]: | |
end_node_matchers.append({"match": {"node_name": node_name}}) | |
# TODO: take into consideration qc_query["mandatory_to_be_collected"] | |
# for matching 'collected_entities' in base data | |
all_should_conditions += [ | |
{ | |
"bool": { | |
"should": {"match": {"session_last_node": True}} | |
} | |
}, | |
{ | |
"bool": { | |
"should": {"match": {"bot_break": True}} | |
} | |
}, | |
{ | |
"bool": { | |
"should": end_node_matchers | |
} | |
} | |
] | |
return all_should_conditions | |
################################################################ | |
################################################################ | |
def print_as_rows(data, node_name_keys, lastness_key, lastness_value): | |
NODE_NAME_SLICE = slice(19,30) | |
TS_SLICE = slice(6, -1) | |
print(" ".join([key[:10] for key in data[0].keys()])) | |
for datum in data: | |
line_to_print = "" | |
session_last_node = False | |
for key, value in datum.items(): | |
if key == "timestamp": | |
value = value[TS_SLICE] | |
elif key in node_name_keys: | |
if value: | |
value = value[NODE_NAME_SLICE] | |
if len(value) < 10: | |
value += '_' * (10 - len(value)) | |
else: | |
value = "_" * 10 | |
line_to_print += str(value) + '\t' | |
if key == lastness_key and value == lastness_value: | |
session_last_node = True | |
print(line_to_print) | |
if session_last_node: | |
print('\n') | |
def main(task): | |
if task == "prep_base_data": | |
scroll_id = None | |
while True: | |
prepped_data, scroll_id = get_base_data(BUSINESS_ID, ES_BASE_START_TS, ES_BASE_END_TS, scroll_id) | |
if not prepped_data: | |
break | |
# print_as_rows(prepped_data[1500:1800], ["node_name", "prev_node_name"], "session_last_node", True) | |
print(len(prepped_data)) | |
bulk_insert_data(prepped_data, BASE_INDEX, '_doc') | |
# Actual task, BENCHMARK this | |
elif task == "make_session_wise_query_stats": | |
base_index_count = get_index_count(BASE_INDEX) | |
for i in range(len(QC_QUERIES)): | |
qc_queries = {} | |
for item in list(QC_QUERIES.values())[i:]: | |
qc_queries[item["query_id"]] = item | |
start = timer() | |
total_records_count = 0 | |
scroll_id = None | |
while True: | |
filtered_data, scroll_id = get_filtered_data(BASE_INDEX, qc_queries, scroll_id) | |
if not filtered_data: | |
break | |
print(f'{len(filtered_data)} records filtered from base') | |
session_wise_query_stats, intermediate_data = get_session_wise_query_stats(filtered_data, qc_queries) | |
total_records_count += len(session_wise_query_stats) | |
# print(session_wise_query_stats) | |
# 3100 0.385 sec | |
bulk_insert_data(session_wise_query_stats, PROCESSED_INDEX, '_doc') | |
end = timer() | |
print( | |
f'{base_index_count} base records, ', | |
f'{len(qc_queries)} queries, ', | |
f'{total_records_count} result records, ', | |
f'{str(round(end - start, 3))} sec' | |
) | |
print("-" * 50) | |
# elif task == "preprocess_data": | |
# data, total, aggregations = get_filtered_data(BASE_INDEX, QC_QUERIES) | |
# processed_data = preprocess_data(data, QC_QUERIES) | |
# print_as_rows(processed_data[500:800], ["node_name","base_prev_node_name"], "node_type", "cmplt") | |
elif task == "get_sample_data": | |
pass | |
if __name__ == "__main__": | |
# import argparse | |
# parser = argparse.ArgumentParser(description='Create a ArcHydro schema') | |
# parser.add_argument('--task', metavar='task', help='whaddya wanna do?') | |
# args = parser.parse_args() | |
main("make_session_wise_query_stats") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment