Created
May 23, 2024 22:16
-
-
Save onefoursix/bd163ac4594d317881fc1f9362c2b3a2 to your computer and use it in GitHub Desktop.
streamsets_job_metrics.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from datetime import datetime | |
from time import time | |
import sys | |
from streamsets.sdk import ControlHub | |
import json | |
# Set to true to echo the metrics to stdout | |
print_metrics = True | |
output_file = '/Users/mark/Desktop/streamsets_job_metrics.json' | |
# Control Hub Credentials | |
cred_id = '' | |
cred_token = '' | |
def print_usage_and_exit(): | |
print('Usage: $ python3 streamsets_metrics_to_snowflake.py <lookback_minutes>') | |
print('Usage Example: $ python3 streamsets_metrics_to_snowflake.py 60') | |
sys.exit(1) | |
def convert_millis_to_datetime_string(millis): | |
return datetime.fromtimestamp(millis).strftime("%Y-%m-%d %H:%M:%S") | |
# Check the number of command line args | |
if len(sys.argv) != 2: | |
print('Error: Wrong number of arguments') | |
print_usage_and_exit() | |
# Validate the command line args | |
lookback_minutes = sys.argv[1] | |
try: | |
lookback_minutes = int(lookback_minutes) | |
except ValueError as ve: | |
print('Error: lookback_minutes arg \'{}\' is not an integer'.format(lookback_minutes)) | |
print_usage_and_exit() | |
# Current time | |
current_time_seconds = time() | |
# Starting time to look for Jobs | |
start_time_seconds = int(current_time_seconds - (lookback_minutes * 60)) | |
start_time_millis = start_time_seconds * 1000 | |
# Print the settings | |
print('-------------------------------------') | |
print('Current time is {}' | |
.format(convert_millis_to_datetime_string(current_time_seconds))) | |
print('Lookback minutes is {}'.format(lookback_minutes)) | |
print('Will get metrics for Jobs started after {}' | |
.format(convert_millis_to_datetime_string(start_time_seconds))) | |
print('-------------------------------------') | |
# Connect to Control Hub | |
sch = None | |
try: | |
sch = ControlHub( | |
credential_id=cred_id, | |
token=cred_token) | |
except Exception as e: | |
print('Error connecting to Control Hub') | |
print(str(e)) | |
sys.exit(1) | |
print ('Connected to Control Hub') | |
# Job runs to get metrics for | |
job_runs = [] | |
# Get Job runs that were started after the lookback time | |
def get_run_metrics(job, job_run, metrics): | |
for m in metrics: | |
if m.run_count == job_run.run_count: | |
return m; | |
print('Error finding metrics for run #{} for Job {}'.format(job_run.run_count, job.job_name)) | |
return None | |
for job in sch.jobs: | |
history = job.history | |
metrics = job.metrics | |
start_time = None | |
done = False | |
for job_run in history: | |
if (job_run.start_time >= start_time_millis ): | |
run= {} | |
run['ID'] = job.job_id | |
run['NAME'] = job.job_name | |
run['CREATETIME'] = job.created_on | |
run['LASTMODIFIEDON'] = job.last_modified_on | |
run['PIPELINENAME'] = job.pipeline_name | |
run['PIPELINECOMMITLABEL'] = job.commit_label | |
run['RUNCOUNT'] = job_run.run_count | |
run['STARTTIME'] = job_run.start_time | |
run['FINISHTIME'] = job_run.finish_time | |
run['ERRORMESSAGE'] = job_run.error_message | |
run['COLOR'] = job_run.color | |
run['STATUS'] = job_run.status | |
run_metrics = get_run_metrics(job, job_run, metrics) | |
if run_metrics is not None: | |
run['INPUTRECORDS'] = run_metrics.input_count | |
run['OUTPUTRECORDS'] = run_metrics.output_count | |
run['ERRORRECORDS'] = run_metrics.error_count | |
else: | |
run['INPUTRECORDS'] = -1 | |
run['OUTPUTRECORDS'] = -1 | |
run['ERRORRECORDS'] = -1 | |
job_runs.append(run) | |
else: | |
break | |
with open(output_file, 'w', encoding='utf-8') as f: | |
for run in job_runs: | |
f.write(json.dumps(run) + '\n') | |
sys.exit(0) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment