Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Last active March 12, 2025 04:01
Show Gist options
  • Save onefoursix/5b7aa33e67d4295889f8c75f2c498b5e to your computer and use it in GitHub Desktop.
Save onefoursix/5b7aa33e67d4295889f8c75f2c498b5e to your computer and use it in GitHub Desktop.
A Python script to get StreamSets Job Metrics using the StreamSets Python SDK
#!/usr/bin/env python
'''
A simple script to get Job run history and metrics for the latest run of each Job.
Example output looks like this:
-------------------------------------
$ python3 job_metrics.py
{"job_id": "f2275440-be70-444f-b25e-2c54619ad507:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "d2t", "create_time": 1726509431556, "last_modified": 1736300865377, "pipeline_name": "d2t", "pipeline_commit_label": "v3", "run_count": 2, "start_time": 1736300867197, "finish_time": 1736300888095, "error_message": null, "color": "GRAY", "status": "INACTIVE", "input_records": 1, "output_records": 1, "error_records": 0}
{"job_id": "a8f45d57-49c7-463d-b48d-982bd967d725:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "DEV Job for P1", "create_time": 1723151084016, "last_modified": 1723151088514, "pipeline_name": "d 2 t", "pipeline_commit_label": "v3", "run_count": 4, "start_time": 1741046427629, "finish_time": 1741048987802, "error_message": "JOBRUNNER_65 - A Data Collectors engine didn't respond to the stop command, for more details check the Job Status History. Error waiting manual acknowledge.", "color": "RED", "status": "INACTIVE_ERROR", "input_records": 1, "output_records": 1, "error_records": 0}
{"job_id": "208e479f-c34c-4379-8529-c03d5c6d3f60:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "Get Weather Events", "create_time": 1696984398077, "last_modified": 1739573088348, "pipeline_name": "Get Weather Events", "pipeline_commit_label": "v50", "run_count": 172, "start_time": 1736874582141, "finish_time": 1736894120082, "error_message": null, "color": "GRAY", "status": "INACTIVE", "input_records": 14122, "output_records": 14121, "error_records": 1}
Error finding metrics for run #9 for Job JDBC to Snowflake Bulk Load
{"job_id": "49905e44-bbee-400d-87c7-074f8347657a:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "JDBC to Snowflake Bulk Load", "create_time": 1722284641338, "last_modified": 1722957453633, "pipeline_name": "JDBC to Snowflake Bulk Load", "pipeline_commit_label": "v4", "run_count": 9, "start_time": 1722964123497, "finish_time": 1722964154169, "error_message": null, "color": "GRAY", "status": "INACTIVE", "input_records": -1, "output_records": -1, "error_records": -1}
{"job_id": "f3040233-23bc-4bbc-929b-ab8bbeeaf12d:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "Job 1", "create_time": 1741283416690, "last_modified": 1741283615427, "pipeline_name": "new_pipeline", "pipeline_commit_label": "v5", "run_count": 2, "start_time": 1741283947184, "finish_time": 1741284269072, "error_message": null, "color": "GRAY", "status": "INACTIVE", "input_records": 308000, "output_records": 308000, "error_records": 0}
{"job_id": "e0d66c92-c9b4-4c36-9596-cd5732bd1fac:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "Job 1", "create_time": 1741748663498, "last_modified": 1741748710021, "pipeline_name": "d2t", "pipeline_commit_label": "v5", "run_count": 2, "start_time": 1741748711527, "finish_time": 0, "error_message": null, "color": "GREEN", "status": "ACTIVE", "input_records": 32630000, "output_records": 32630000, "error_records": 0}
{"job_id": "dd3f09d6-fe13-4e76-97fb-9c36a780312f:8030c2e9-1a39-11ec-a5fe-97c8d4369386", "job_name": "Job 2", "create_time": 1728313744790, "last_modified": 1728332039099, "pipeline_name": "ross-test", "pipeline_commit_label": "v1", "run_count": 4, "start_time": 1728335780813, "finish_time": 1728335824685, "error_message": null, "color": "GRAY", "status": "INACTIVE", "input_records": 320000, "output_records": 320000, "error_records": 0}
-------------------------------------
'''
# Imports
import os, sys, json, time
from datetime import datetime
from streamsets.sdk import ControlHub
# Get Control Hub API credentials from the environment
cred_id = os.getenv('CRED_ID')
cred_token = os.getenv('CRED_TOKEN')
# Get metrics for a specific Job run
def get_run_metrics(job_name, run_count, the_metrics):
for m in the_metrics:
if m.run_count == job_run.run_count:
return m
print('Error finding metrics for run #{} for Job {}'.format(run_count, job_name))
return None
# Connect to Control Hub
sch = None
try:
sch = ControlHub(credential_id=cred_id, token=cred_token)
except Exception as e:
print('Error: Could not connect to Control Hub.')
print('Check your API credentials and the Control Hub URL')
print('Exception: ' + str(e))
sys.exit(-1)
# Loop through all Jobs
for job in sch.jobs:
# We'll store history and metrics for each Job run in this variable
run = {}
# Ignore Job Templates
if not job.job_template:
# Get the Job History
history = job.history
if history is not None and len(history) > 0:
try:
# Get the Job Metrics
metrics = job.metrics
# Get the most recent run
job_run = history[0]
# Get the Job history
run = {}
run['job_id'] = job.job_id
run['job_name'] = job.job_name
run['create_time'] = job.created_on
run['last_modified'] = job.last_modified_on
run['pipeline_name'] = job.pipeline_name
run['pipeline_commit_label'] = job.commit_label
run['run_count'] = job_run.run_count
run['start_time'] = job_run.start_time
run['finish_time'] = job_run.finish_time
run['error_message'] = job_run.error_message
run['color'] = job_run.color
run['status'] = job_run.status
# Get the Job Run's metrics
run_metrics = get_run_metrics(job.job_name, job_run.run_count, metrics)
# If no metrics exists, set all row counts to -1 as a flag
if run_metrics is not None:
run['input_records'] = run_metrics.input_count
run['output_records'] = run_metrics.output_count
run['error_records'] = run_metrics.error_count
else:
run['input_records'] = -1
run['output_records'] = -1
run['error_records'] = -1
except KeyError as ke:
print('-------------------------------------')
print('KeyError Exception getting metrics for Job \'{}\''.format(job.job_name))
print('Metrics collection for this Job will be skipped')
print('Exception message is: ' + str(ke))
print('-------------------------------------')
if len(run) > 0:
print(json.dumps(run))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment