Last active
March 12, 2025 04:01
-
-
Save onefoursix/5b7aa33e67d4295889f8c75f2c498b5e to your computer and use it in GitHub Desktop.
A Python script to get StreamSets Job Metrics using the StreamSets Python SDK
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
A simple script to get Job run history and metrics for the latest run of each Job. | |
Example output looks like this: | |
------------------------------------- | |
$ python3 job_metrics.py | |
{"job_id": "f2275440-be70-444f-b25e-2c54619ad507:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "d2t", "create_time": 1726509431556, "last_modified": 1736300865377, "pipeline_name": "d2t", "pipeline_commit_label": "v3", "run_count": 2, "start_time": 1736300867197, "finish_time": 1736300888095, "error_message": null, "color": "GRAY", "status": "INACTIVE", "input_records": 1, "output_records": 1, "error_records": 0} | |
{"job_id": "a8f45d57-49c7-463d-b48d-982bd967d725:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "DEV Job for P1", "create_time": 1723151084016, "last_modified": 1723151088514, "pipeline_name": "d 2 t", "pipeline_commit_label": "v3", "run_count": 4, "start_time": 1741046427629, "finish_time": 1741048987802, "error_message": "JOBRUNNER_65 - A Data Collectors engine didn't respond to the stop command, for more details check the Job Status History. Error waiting manual acknowledge.", "color": "RED", "status": "INACTIVE_ERROR", "input_records": 1, "output_records": 1, "error_records": 0} | |
{"job_id": "208e479f-c34c-4379-8529-c03d5c6d3f60:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "Get Weather Events", "create_time": 1696984398077, "last_modified": 1739573088348, "pipeline_name": "Get Weather Events", "pipeline_commit_label": "v50", "run_count": 172, "start_time": 1736874582141, "finish_time": 1736894120082, "error_message": null, "color": "GRAY", "status": "INACTIVE", "input_records": 14122, "output_records": 14121, "error_records": 1} | |
Error finding metrics for run #9 for Job JDBC to Snowflake Bulk Load | |
{"job_id": "49905e44-bbee-400d-87c7-074f8347657a:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "JDBC to Snowflake Bulk Load", "create_time": 1722284641338, "last_modified": 1722957453633, "pipeline_name": "JDBC to Snowflake Bulk Load", "pipeline_commit_label": "v4", "run_count": 9, "start_time": 1722964123497, "finish_time": 1722964154169, "error_message": null, "color": "GRAY", "status": "INACTIVE", "input_records": -1, "output_records": -1, "error_records": -1} | |
{"job_id": "f3040233-23bc-4bbc-929b-ab8bbeeaf12d:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "Job 1", "create_time": 1741283416690, "last_modified": 1741283615427, "pipeline_name": "new_pipeline", "pipeline_commit_label": "v5", "run_count": 2, "start_time": 1741283947184, "finish_time": 1741284269072, "error_message": null, "color": "GRAY", "status": "INACTIVE", "input_records": 308000, "output_records": 308000, "error_records": 0} | |
{"job_id": "e0d66c92-c9b4-4c36-9596-cd5732bd1fac:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "Job 1", "create_time": 1741748663498, "last_modified": 1741748710021, "pipeline_name": "d2t", "pipeline_commit_label": "v5", "run_count": 2, "start_time": 1741748711527, "finish_time": 0, "error_message": null, "color": "GREEN", "status": "ACTIVE", "input_records": 32630000, "output_records": 32630000, "error_records": 0} | |
{"job_id": "dd3f09d6-fe13-4e76-97fb-9c36a780312f:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "Job 2", "create_time": 1728313744790, "last_modified": 1728332039099, "pipeline_name": "ross-test", "pipeline_commit_label": "v1", "run_count": 4, "start_time": 1728335780813, "finish_time": 1728335824685, "error_message": null, "color": "GRAY", "status": "INACTIVE", "input_records": 320000, "output_records": 320000, "error_records": 0} | |
------------------------------------- | |
''' | |
# Imports | |
import os, sys, json, time | |
from datetime import datetime | |
from streamsets.sdk import ControlHub | |
# Get Control Hub API credentials from the environment | |
cred_id = os.getenv('CRED_ID') | |
cred_token = os.getenv('CRED_TOKEN') | |
# Get metrics for a specific Job run | |
def get_run_metrics(job_name, run_count, the_metrics): | |
for m in the_metrics: | |
if m.run_count == job_run.run_count: | |
return m | |
print('Error finding metrics for run #{} for Job {}'.format(run_count, job_name)) | |
return None | |
# Connect to Control Hub | |
sch = None | |
try: | |
sch = ControlHub(credential_id=cred_id, token=cred_token) | |
except Exception as e: | |
print('Error: Could not connect to Control Hub.') | |
print('Check your API credentials and the Control Hub URL') | |
print('Exception: ' + str(e)) | |
sys.exit(-1) | |
# Loop through all Jobs | |
for job in sch.jobs: | |
# We'll store history and metrics for each Job run in this variable | |
run = {} | |
# Ignore Job Templates | |
if not job.job_template: | |
# Get the Job History | |
history = job.history | |
if history is not None and len(history) > 0: | |
try: | |
# Get the Job Metrics | |
metrics = job.metrics | |
# Get the most recent run | |
job_run = history[0] | |
# Get the Job history | |
run = {} | |
run['job_id'] = job.job_id | |
run['job_name'] = job.job_name | |
run['create_time'] = job.created_on | |
run['last_modified'] = job.last_modified_on | |
run['pipeline_name'] = job.pipeline_name | |
run['pipeline_commit_label'] = job.commit_label | |
run['run_count'] = job_run.run_count | |
run['start_time'] = job_run.start_time | |
run['finish_time'] = job_run.finish_time | |
run['error_message'] = job_run.error_message | |
run['color'] = job_run.color | |
run['status'] = job_run.status | |
# Get the Job Run's metrics | |
run_metrics = get_run_metrics(job.job_name, job_run.run_count, metrics) | |
# If no metrics exists, set all row counts to -1 as a flag | |
if run_metrics is not None: | |
run['input_records'] = run_metrics.input_count | |
run['output_records'] = run_metrics.output_count | |
run['error_records'] = run_metrics.error_count | |
else: | |
run['input_records'] = -1 | |
run['output_records'] = -1 | |
run['error_records'] = -1 | |
except KeyError as ke: | |
print('-------------------------------------') | |
print('KeyError Exception getting metrics for Job \'{}\''.format(job.job_name)) | |
print('Metrics collection for this Job will be skipped') | |
print('Exception message is: ' + str(ke)) | |
print('-------------------------------------') | |
if len(run) > 0: | |
print(json.dumps(run)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment