Created
September 20, 2022 07:11
-
-
Save is/1cd286d0c0d7812f6fcb6a1f8266408b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os | |
import sys | |
import glob | |
import csv | |
import json | |
__version__ = '0.0.1' | |
def find_event_log(path:str) -> str: | |
if os.path.isfile(path): | |
return path | |
if not os.path.isdir(path): | |
return None | |
fns = glob.glob(f'{path}/application*') | |
if not fns: | |
return None | |
return fns[0] | |
def load_event_log(path:str) -> list: | |
events = [] | |
lineno = 1 | |
path = find_event_log(path) | |
with open(path, 'r') as fin: | |
for line in fin.readlines(): | |
event = json.loads(line) | |
event['lineno'] = lineno | |
lineno += 1 | |
events.append(event) | |
return events | |
def map_append(x, key1, key2, value): | |
if key1 in x: | |
d = x[key1] | |
else: | |
d = {} | |
x[key1] = d | |
d[key2] = value | |
def event_ts(event, start, end): | |
event['ts_begin'] = start | |
event['ts_end'] = end | |
event['during'] = end - start | |
def collate_events(events) -> dict: | |
idx = {} | |
tasks = [] | |
jobs = [] | |
stages = [] | |
sqls = [] | |
for i in ['tasks', 'jobs', 'stages', | |
'stage_tasks', 'sqls', 'sql_jobs']: | |
idx[i] = {} | |
execution_id = -1 | |
for event in events: | |
event_type = event['Event'] | |
if event_type == 'SparkListenerTaskEnd': | |
tasks.append(event) | |
event_ts(event, event['Task Info']['Launch Time'], event['Task Info']['Finish Time']) | |
stage_id = event['Stage ID'] | |
task_id = event['Task Info']['Task ID'] | |
index = event['Task Info']['Index'] | |
idx['tasks'][task_id] = event | |
map_append(idx['stage_tasks'], stage_id, index, event) | |
elif event_type == 'SparkListenerStageCompleted': | |
stages.append(event) | |
event_ts(event, event['Stage Info']['Submission Time'], event['Stage Info']['Completion Time']) | |
stage_id = event['Stage Info']['Stage ID'] | |
event['_id'] = stage_id | |
idx['stages'][stage_id] = event | |
elif event_type == 'SparkListenerJobStart': | |
job_id = event['Job ID'] | |
event['Execution ID'] = execution_id | |
event['_id'] = job_id | |
idx['jobs'][job_id] = event | |
map_append(idx['sql_jobs'], execution_id, job_id, event) | |
jobs.append(event) | |
elif event_type == 'SparkListenerJobEnd': | |
job_id = event['Job ID'] | |
idx['jobs'][job_id]['END'] = event | |
event = idx['jobs'][job_id] | |
event_ts(event, event['Submission Time'], event['END']['Completion Time']) | |
elif event_type == 'org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart': | |
execution_id = event['executionId'] | |
event['_id'] = execution_id | |
sqls.append(event) | |
idx['sqls'][execution_id] = event | |
elif event_type == 'org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd': | |
execution_id = event['executionId'] | |
idx['sqls'][execution_id]['end'] = event | |
event = idx['sqls'][execution_id] | |
event_ts(event, event['time'], event['end']['time']) | |
x = {'tasks': tasks, 'jobs': jobs, 'stages': stages, 'sqls': sqls, 'index':idx} | |
return x | |
def x__get_sql_jobids(x, eid): | |
job_ids = [e['_id'] for e in x['index']['sql_jobs'][eid].values()] | |
return job_ids | |
class Pair: | |
def __init__(self, name, index, left, right): | |
self.name = name | |
self.index = index | |
self.left = left | |
self.right = right | |
self.pair = [] | |
class Align: | |
def __init__(self, es0, es1): | |
self.es0 = es0 | |
self.es1 = es1 | |
self.root = Pair("batch", 0, es0, es1) | |
def align(self): | |
n_sqls0 = len(self.es0['index']['sqls']) | |
n_sqls1 = len(self.es1['index']['sqls']) | |
if n_sqls0 != n_sqls1: | |
return self.root | |
for i in range(n_sqls0): | |
sql0 = self.es0['index']['sqls'][i] | |
sql1 = self.es1['index']['sqls'][i] | |
self.align_sql(self.root, i, sql0, sql1) | |
return self.root | |
def align_sql(self, parent, idx, sql0, sql1): | |
root = Pair("sql", idx, sql0, sql1) | |
parent.pair.append(root) | |
jobs_0 = x__get_sql_jobids(self.es0, idx) | |
jobs_1 = x__get_sql_jobids(self.es1, idx) | |
if len(jobs_0) != len(jobs_1): | |
return | |
for i in range(len(jobs_0)): | |
job_id0 = jobs_0[i] | |
job_id1 = jobs_1[i] | |
job_0 = self.es0['index']['jobs'][job_id0] | |
job_1 = self.es1['index']['jobs'][job_id1] | |
self.align_job(root, i, job_0, job_1) | |
def align_job(self, parent, idx, job0, job1): | |
root = Pair("job", idx, job0, job1) | |
parent.pair.append(root) | |
stages_0 = [s['Stage ID'] for s in job0['Stage Infos']] | |
stages_1 = [s['Stage ID'] for s in job1['Stage Infos']] | |
if len(stages_0) != len(stages_1): | |
return | |
stages_0.sort() | |
stages_1.sort() | |
# print('--STAGE--') | |
# print(stages_0) | |
# print(stages_1) | |
for i in range(len(stages_0)): | |
stage_id0 = stages_0[i] | |
stage_id1 = stages_1[i] | |
if stage_id0 not in self.es0['index']['stages']: | |
continue | |
if stage_id1 not in self.es1['index']['stages']: | |
continue | |
stage0 = self.es0['index']['stages'][stage_id0] | |
stage1 = self.es1['index']['stages'][stage_id1] | |
self.align_stage(root, i, stage0, stage1) | |
def align_stage(self, parent, idx, stage0, stage1): | |
root = Pair("stage", idx, stage0, stage1) | |
parent.pair.append(root) | |
def do_align(es0, es1): | |
a = Align(es0, es1) | |
return a.align() | |
def event_log_pair(x0:dict, x1:dict) -> list: | |
jobs0:list[dict] = x0['jobs'] | |
jobs1:list[dict] = x1['jobs'] | |
if len(jobs0) != len(jobs1): | |
return None | |
job_number = len(jobs0) | |
pairs = [] | |
for i in range(job_number): | |
pairs.append(event_log_pair_job(x0, x1, jobs0[i], jobs1[i])) | |
return pairs | |
def dump_align__during(p): | |
return f"""{p.left['during']}/{p.right['during']}({p.right['during'] - p.left['during']})""" | |
def dump_align(a): | |
for p0 in a.pair: | |
l0 = p0.left | |
r0 = p0.right | |
pf0l = p0.left['_id'] | |
pf0r = p0.right['_id'] | |
print(f'E {pf0l}:{pf0r} {dump_align__during(p0)} _ jobs:{len(p0.pair)}') | |
for p1 in p0.pair: | |
l1 = p1.left | |
r1 = p1.right | |
pf1l = f"{pf0l}_{l1['_id']}" | |
pf1r = f"{pf0r}_{r1['_id']}" | |
print(f' J {pf1l}:{pf1r} {dump_align__during(p1)}') | |
for p2 in p1.pair: | |
l2 = p2.left | |
r2 = p2.right | |
pf2l = f"{pf1l}x{l2['_id']}" | |
pf2r = f"{pf1r}x{r2['_id']}" | |
print(f' S {pf2l}:{pf2r} {dump_align__during(p2)}') | |
def main(): | |
p0 = sys.argv[1] | |
p1 = sys.argv[2] | |
e0 = load_event_log(p0) | |
e1 = load_event_log(p1) | |
es0 = collate_events(e0) | |
es1 = collate_events(e1) | |
align = do_align(es0, es1) | |
dump_align(align) | |
if __name__ == '__main__': | |
main() | |
# vim: ts=4 sts=4 ai expandtab sw=4 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment