Last active
July 25, 2022 14:33
-
-
Save JacobCallahan/86fd2e7b5d8fc5d4c0beef1b83f16ed7 to your computer and use it in GitHub Desktop.
Quick script to scrape robottelo worker logs for time spent deploying
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import json | |
from datetime import datetime, timedelta | |
from pathlib import Path | |
class Tracker: | |
def __init__(self): | |
self._td_total = timedelta() | |
self._count = 0 | |
self._td_max = timedelta() | |
def _better_timedelta(self, time_delta=None): | |
if not time_delta: | |
time_delta = self._td_total | |
hours = int(time_delta.total_seconds() // 3600) | |
secs = time_delta.total_seconds() % 3600 | |
mins = int(secs // 60) | |
secs = int(secs % 60) | |
return f"{hours}h {mins}m {secs}s" | |
@property | |
def total(self): | |
return self._better_timedelta() | |
@property | |
def average(self): | |
return self._better_timedelta(self._td_total / (self._count or 1)) | |
@property | |
def max(self): | |
return self._better_timedelta(self._td_max) | |
def __add__(self, new_dt): | |
if isinstance(new_dt, Tracker): | |
self._td_total += new_dt._td_total | |
self._count += new_dt._count | |
if new_dt._td_max > self._td_max: | |
self._td_max = new_dt._td_max | |
else: | |
self._td_total += new_dt | |
self._count += 1 | |
if new_dt > self._td_max: | |
self._td_max = new_dt | |
return self | |
def __str__(self): | |
return f"total: {self.total:10}, count: {self._count:2}, avg: {self.average:10}, max: {self.max:10}" | |
def get_workflow(payload_str): | |
"""get workflow name from the payload log line""" | |
cleaned = payload_str.strip(" payload=").replace('"', "").replace("'", '"') | |
return json.loads(cleaned)["extra_vars"]["workflow"] | |
def get_dt_object(log_line): | |
_, day, time, *rest = log_line.split() | |
return datetime.strptime(f"{day[2:]} {time}", "%m%d %H:%M:%S") | |
def main(log_file): | |
times = {} | |
last_line = None | |
investigate, triggers = [], [] | |
with log_file.open() as lf: | |
for line in lf: | |
if line.startswith(" payload={'extra_vars':"): | |
triggers.append(line) | |
elif "] Waiting for job:" in line: | |
trigger = triggers.pop(0) | |
try: | |
workflow = get_workflow(trigger) | |
except: | |
print(f"Bad prev: {trigger}\nAt line: {line}") | |
continue | |
if "deploy" in workflow: | |
investigate.append((workflow, get_dt_object(line))) | |
if "] Found artifacts:" in line: | |
end = get_dt_object(line) | |
curr = investigate.pop(0) | |
time_diff = end - curr[1] | |
times[curr[0]] = ( | |
times.get(curr[0], Tracker()) + time_diff | |
) | |
# last_line = line | |
total = Tracker() | |
for k, v in times.items(): | |
total += v | |
print(f"{k:30}{v}") | |
print(f"{'Total:':30}{total}") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Let's go through some logs!") | |
parser.add_argument("log_file", type=str, help="path to the log file") | |
parsed_args = parser.parse_args() | |
log_path = Path(parsed_args.log_file) | |
if not log_path.exists(): | |
print(f"You might want to check that path.. {log_path.absolute()}") | |
else: | |
main(log_path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Just save the worker log(s) locally and run the script against it