Skip to content

Instantly share code, notes, and snippets.

@is
Created September 20, 2022 07:11
Show Gist options
  • Save is/1cd286d0c0d7812f6fcb6a1f8266408b to your computer and use it in GitHub Desktop.
Save is/1cd286d0c0d7812f6fcb6a1f8266408b to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
import os
import sys
import glob
import csv
import json
__version__ = '0.0.1'
def find_event_log(path:str) -> str:
if os.path.isfile(path):
return path
if not os.path.isdir(path):
return None
fns = glob.glob(f'{path}/application*')
if not fns:
return None
return fns[0]
def load_event_log(path:str) -> list:
events = []
lineno = 1
path = find_event_log(path)
with open(path, 'r') as fin:
for line in fin.readlines():
event = json.loads(line)
event['lineno'] = lineno
lineno += 1
events.append(event)
return events
def map_append(x, key1, key2, value):
if key1 in x:
d = x[key1]
else:
d = {}
x[key1] = d
d[key2] = value
def event_ts(event, start, end):
event['ts_begin'] = start
event['ts_end'] = end
event['during'] = end - start
def collate_events(events) -> dict:
idx = {}
tasks = []
jobs = []
stages = []
sqls = []
for i in ['tasks', 'jobs', 'stages',
'stage_tasks', 'sqls', 'sql_jobs']:
idx[i] = {}
execution_id = -1
for event in events:
event_type = event['Event']
if event_type == 'SparkListenerTaskEnd':
tasks.append(event)
event_ts(event, event['Task Info']['Launch Time'], event['Task Info']['Finish Time'])
stage_id = event['Stage ID']
task_id = event['Task Info']['Task ID']
index = event['Task Info']['Index']
idx['tasks'][task_id] = event
map_append(idx['stage_tasks'], stage_id, index, event)
elif event_type == 'SparkListenerStageCompleted':
stages.append(event)
event_ts(event, event['Stage Info']['Submission Time'], event['Stage Info']['Completion Time'])
stage_id = event['Stage Info']['Stage ID']
event['_id'] = stage_id
idx['stages'][stage_id] = event
elif event_type == 'SparkListenerJobStart':
job_id = event['Job ID']
event['Execution ID'] = execution_id
event['_id'] = job_id
idx['jobs'][job_id] = event
map_append(idx['sql_jobs'], execution_id, job_id, event)
jobs.append(event)
elif event_type == 'SparkListenerJobEnd':
job_id = event['Job ID']
idx['jobs'][job_id]['END'] = event
event = idx['jobs'][job_id]
event_ts(event, event['Submission Time'], event['END']['Completion Time'])
elif event_type == 'org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart':
execution_id = event['executionId']
event['_id'] = execution_id
sqls.append(event)
idx['sqls'][execution_id] = event
elif event_type == 'org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd':
execution_id = event['executionId']
idx['sqls'][execution_id]['end'] = event
event = idx['sqls'][execution_id]
event_ts(event, event['time'], event['end']['time'])
x = {'tasks': tasks, 'jobs': jobs, 'stages': stages, 'sqls': sqls, 'index':idx}
return x
def x__get_sql_jobids(x, eid):
job_ids = [e['_id'] for e in x['index']['sql_jobs'][eid].values()]
return job_ids
class Pair:
def __init__(self, name, index, left, right):
self.name = name
self.index = index
self.left = left
self.right = right
self.pair = []
class Align:
def __init__(self, es0, es1):
self.es0 = es0
self.es1 = es1
self.root = Pair("batch", 0, es0, es1)
def align(self):
n_sqls0 = len(self.es0['index']['sqls'])
n_sqls1 = len(self.es1['index']['sqls'])
if n_sqls0 != n_sqls1:
return self.root
for i in range(n_sqls0):
sql0 = self.es0['index']['sqls'][i]
sql1 = self.es1['index']['sqls'][i]
self.align_sql(self.root, i, sql0, sql1)
return self.root
def align_sql(self, parent, idx, sql0, sql1):
root = Pair("sql", idx, sql0, sql1)
parent.pair.append(root)
jobs_0 = x__get_sql_jobids(self.es0, idx)
jobs_1 = x__get_sql_jobids(self.es1, idx)
if len(jobs_0) != len(jobs_1):
return
for i in range(len(jobs_0)):
job_id0 = jobs_0[i]
job_id1 = jobs_1[i]
job_0 = self.es0['index']['jobs'][job_id0]
job_1 = self.es1['index']['jobs'][job_id1]
self.align_job(root, i, job_0, job_1)
def align_job(self, parent, idx, job0, job1):
root = Pair("job", idx, job0, job1)
parent.pair.append(root)
stages_0 = [s['Stage ID'] for s in job0['Stage Infos']]
stages_1 = [s['Stage ID'] for s in job1['Stage Infos']]
if len(stages_0) != len(stages_1):
return
stages_0.sort()
stages_1.sort()
# print('--STAGE--')
# print(stages_0)
# print(stages_1)
for i in range(len(stages_0)):
stage_id0 = stages_0[i]
stage_id1 = stages_1[i]
if stage_id0 not in self.es0['index']['stages']:
continue
if stage_id1 not in self.es1['index']['stages']:
continue
stage0 = self.es0['index']['stages'][stage_id0]
stage1 = self.es1['index']['stages'][stage_id1]
self.align_stage(root, i, stage0, stage1)
def align_stage(self, parent, idx, stage0, stage1):
root = Pair("stage", idx, stage0, stage1)
parent.pair.append(root)
def do_align(es0, es1):
a = Align(es0, es1)
return a.align()
def event_log_pair(x0:dict, x1:dict) -> list:
jobs0:list[dict] = x0['jobs']
jobs1:list[dict] = x1['jobs']
if len(jobs0) != len(jobs1):
return None
job_number = len(jobs0)
pairs = []
for i in range(job_number):
pairs.append(event_log_pair_job(x0, x1, jobs0[i], jobs1[i]))
return pairs
def dump_align__during(p):
return f"""{p.left['during']}/{p.right['during']}({p.right['during'] - p.left['during']})"""
def dump_align(a):
for p0 in a.pair:
l0 = p0.left
r0 = p0.right
pf0l = p0.left['_id']
pf0r = p0.right['_id']
print(f'E {pf0l}:{pf0r} {dump_align__during(p0)} _ jobs:{len(p0.pair)}')
for p1 in p0.pair:
l1 = p1.left
r1 = p1.right
pf1l = f"{pf0l}_{l1['_id']}"
pf1r = f"{pf0r}_{r1['_id']}"
print(f' J {pf1l}:{pf1r} {dump_align__during(p1)}')
for p2 in p1.pair:
l2 = p2.left
r2 = p2.right
pf2l = f"{pf1l}x{l2['_id']}"
pf2r = f"{pf1r}x{r2['_id']}"
print(f' S {pf2l}:{pf2r} {dump_align__during(p2)}')
def main():
p0 = sys.argv[1]
p1 = sys.argv[2]
e0 = load_event_log(p0)
e1 = load_event_log(p1)
es0 = collate_events(e0)
es1 = collate_events(e1)
align = do_align(es0, es1)
dump_align(align)
if __name__ == '__main__':
main()
# vim: ts=4 sts=4 ai expandtab sw=4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment