Created
September 18, 2023 06:28
-
-
Save onefoursix/9af687e6205e371dad9451d62600b729 to your computer and use it in GitHub Desktop.
Python-based StreamSets REST API script that get SDC CPU and memory metrics as well as running pipeline record counts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
This script writes a continuous stream of CPU and Memory metrics for a given SDC | |
as well as counts of all running pipelines' input records, output records, and error records. | |
The script uses StreamSets Platform's REST API to pull metrics directly from SDCs; it does not connect to Control Hub | |
On each refresh interval, the script will record CPU and memory metrics and then the record counts for each running pipeline. | |
Sample output looks like this: | |
{"timestamp": "2023-09-17 22:31:19", "sdc_url": "https://sequoia.onefoursix.com:11111", "system_cpu_load": 9, | |
"heap_max": 4216455168, "heap_used": 1132732984, "heap_percentage": 26} | |
{"timestamp": "2023-09-17 22:31:19", "sdc_url": "https://sequoia.onefoursix.com:11111", | |
"pipeline_id": "Weatherto__0dd0db12-7686-4974-bada-b51ade96c695__8030c2e9-1a39-11ec-a5fe-97c8d4369386", | |
"input_records": 24414, "output_records": 24414, "error_records": 0} | |
{"timestamp": "2023-09-17 22:31:19", "sdc_url": "https://sequoia.onefoursix.com:11111", | |
"pipeline_id": "GetWeathe__d18bc85d-659f-45b5-b8cb-27ea7c035689__8030c2e9-1a39-11ec-a5fe-97c8d4369386", | |
"input_records": 5881, "output_records": 5881, "error_records": 0} | |
Prerequisites: | |
- Python 3.6+ | |
- DataOps Platform API Credentials for a user with permissions to read SDC metrics | |
Set the following variables in the script: | |
# Cred ID | |
cred_id = '<your cred id>' | |
# Cred Token | |
cred_token = '<your cred token>' | |
# How often to get metrics | |
refresh_seconds = 60 | |
Execute the script by passing the name of the SDC to monitor and the name of the log file the script | |
should write to as arguments, like this: | |
$ python get-sdc-metrics.py https://sdc1.onefoursix.com:11111 /Users/mark/data/logs/sdc1.log | |
If you need to use a custom cacerts file for Python Requests, set and export the environment | |
variable REQUESTS_CA_BUNDLE before running this script, like this: | |
$ export REQUESTS_CA_BUNDLE=/path/to/ca-certificates.pem | |
You can launch multiple instances of this script to run asychronously, to monitor multiple SDCs, with each SDC's | |
metrics written to its own log file, by creating a shell script named get-sdc-metrics.sh like this: | |
#!/usr/bin/env bash | |
nohup python get-sdc-metrics.py https://sdc1.onefoursix.com:11111 /logs/sdc1.log & | |
nohup python get-sdc-metrics.py https://sdc2.onefoursix.com:11119 /logs/sdc2.log & | |
nohup python get-sdc-metrics.py https://sdc3.onefoursix.com:11119 /logs/sdc3.log & | |
Launch that shell script as a background process: | |
$ nohup ./get-sdc-metrics.sh & | |
and you should see one log file being written for each line in the shell script. | |
''' | |
# Imports | |
import requests, json, datetime, time, sys | |
## User variables ############# | |
# Cred ID | |
cred_id = '' | |
# Cred Token | |
cred_token = '' | |
# How often to get metrics | |
refresh_seconds = 60 | |
## End of User variables ############# | |
# Check command line args | |
if len(sys.argv) != 3: | |
print('Error: wrong number of arguments') | |
print('Usage: $ python get-sdc-metrics.py <SDC_URL> </path/to/log_file>') | |
print('For example: $ python get-sdc-metrics.py https://sdc1.onefoursix.com:11111 /logs/sdc1.log') | |
sys.exit(1) | |
sdc_url = sys.argv[1] | |
log_file = sys.argv[2] | |
def log_message(message): | |
log.write(message + '\n') | |
log.flush() | |
with open(log_file, "a") as log: | |
# Ignore pipelines in these states as they are not running | |
pipeline_states_to_exclude = ['EDITED', 'FINISHED', 'RUN_ERROR', 'STOPPED'] | |
# Create a session | |
s = requests.Session() | |
# Set session headers | |
s.headers.update({ | |
'Content-Type': 'application/json', | |
'X-Requested-By': 'sdc', | |
'X-SS-REST-CALL': 'true', | |
'X-SS-App-Component-Id': cred_id, | |
'X-SS-App-Auth-Token': cred_token}) | |
# SDC JMX URL | |
sdc_jmx_url = sdc_url + '/rest/v1/system/jmx' | |
# URL to get pipeline status | |
sdc_pipelines_status_url = sdc_url + '/rest/v1/pipelines/status' | |
# Loop forever | |
while (True): | |
start_loop_seconds = time.time() | |
# Get SDC's CPU and Memory Metrics | |
try: | |
system_cpu_load = None | |
heap_percentage = None | |
metrics = s.get(sdc_jmx_url).json() | |
for bean in metrics['beans']: | |
if bean['name'] == 'java.lang:type=OperatingSystem': | |
system_cpu_load = int(bean['SystemCpuLoad'] * 100) | |
elif bean['name'] == 'java.lang:type=Memory': | |
heap_max = bean['HeapMemoryUsage']['max'] | |
heap_used = bean['HeapMemoryUsage']['used'] | |
heap_percentage = int((heap_used / heap_max) * 100) | |
if system_cpu_load is not None and heap_percentage is not None: | |
break | |
# Log the CPU and memory metrics | |
if system_cpu_load is not None and heap_percentage is not None: | |
data = {} | |
data['timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
data['sdc_url'] = sdc_url | |
data['system_cpu_load'] = system_cpu_load | |
data['heap_max'] = heap_max | |
data['heap_used'] = heap_used | |
data['heap_percentage'] = heap_percentage | |
log_message(json.dumps(data)) | |
else: | |
log_message('Error getting CPU and memory metrics for SDC at ' + sdc_url) | |
except Exception as e: | |
log_message('Error getting CPU and memory metrics for SDC at ' + sdc_url) | |
log_message(str(e)) | |
# List of pipelines to get metrics for | |
pipelines = [] | |
try: | |
# Get all pipelines | |
all_pipelines = s.get(sdc_pipelines_status_url).json() | |
# Filter out pipelines that are not running | |
for key in all_pipelines.keys(): | |
pipeline = all_pipelines[key] | |
if pipeline['status'] not in pipeline_states_to_exclude: | |
pipelines.append(pipeline) | |
# For each running pipeline | |
for pipeline in pipelines: | |
try: | |
# Get metrics for pipeline | |
pipeline_id = pipeline['pipelineId'] | |
pipeline_metrics_url = sdc_url + '/rest/v1/pipeline/' + pipeline_id + '/metrics?rev=0' | |
metrics = s.get(pipeline_metrics_url).json() | |
# Log the metrics for the pipeline | |
data = {} | |
data['timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
data['sdc_url'] = sdc_url | |
data['pipeline_id'] = pipeline_id | |
counters = metrics['counters'] | |
data['input_records'] = counters['pipeline.batchInputRecords.counter']['count'] | |
data['output_records'] = counters['pipeline.batchOutputRecords.counter']['count'] | |
data['error_records'] = counters['pipeline.batchErrorRecords.counter']['count'] | |
log_message(json.dumps(data)) | |
except Exception as e: | |
log_message('Error getting metrics for pipeline ' + pipeline['pipelineId'] + ' on SDC ' + sdc_url) | |
log_message(str(e)) | |
except Exception as e: | |
log_message('Error connecting to SDC at ' + sdc_url) | |
log_message(str(e)) | |
finally: | |
# Sleep | |
loop_time = time.time() - start_loop_seconds | |
if loop_time < refresh_seconds: | |
time.sleep(refresh_seconds - loop_time) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment