Last active
July 15, 2023 20:27
-
-
Save onefoursix/fc962724d7f5042e8ed38367818d0331 to your computer and use it in GitHub Desktop.
A StreamSets SDK for Python script for Control Hub 3.x to capture CPU and memory metrics as well as running pipelines from Data Collector
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
This script writes a rolling log file that contains CPU usage and JVM heap memory metrics | |
for a given Data Collector registered with Control Hub 3.x, with a user definable refresh | |
interval, along with the number and names of Jobs running on the Data Collector at the time | |
of metrics collection. | |
Prerequisites: | |
- Python 3.6 - 3.8 | |
- StreamSets SDK for Python v3.x | |
See: https://docs.streamsets.com/sdk/latest/installation.html | |
- Control Hub login and password for a user with Organization Administrator role | |
- Set the following variable in the script: | |
output_dir - a pre-existing directory to write the Data Collector metric to | |
metrics_capture_interval_seconds - how frequently to capture metrics | |
- To avoid including credentials in the script, export these environment variables | |
prior to running the script: | |
export SCH_USER=<the Control Hub user> | |
export SCH_PASSWORD=<the Control Hub password> | |
- Run the script with an argument that is the URL of the Data Collector to monitor, like this: | |
$ python3 get-sdc-memory-and-cpu-metrics.py <SDC_URL> | |
for example: | |
$ python3 get-sdc-memory-and-cpu-metrics.py https://sequoia.onefoursix.com:18631 | |
- To run the script as a background process you can set the variable print_metrics_to_console in the | |
script to False and then launch the script using a command like this: | |
$ nohup python3 get-sdc-memory-and-cpu-metrics.py \ https://sequoia.onefoursix.com:18631 > /dev/null 2>&1 & | |
- Sample output looks like this: | |
$ python3 get-sdc-memory-and-cpu-metrics.py https://sequoia.onefoursix.com:18631 | |
Getting resource metrics for Data Collector at https://sequoia.onefoursix.com:18631 | |
{"sdc_url": "https://sequoia.onefoursix.com:18631", "metric_timestamp": "2023-07-14 21:21:57", | |
"heap_memory_used": 335584696, "heap_memory_max": 1073741824, "heap_memory_percentage": 31, | |
"cpu_load_percentage": 9, "number_of_running_jobs": 3, "running_jobs": ["Job 2", "Job 1", "Job 3"]} | |
{"sdc_url": "https://sequoia.onefoursix.com:18631", "metric_timestamp": "2023-07-14 21:22:13", | |
"heap_memory_used": 450928056, "heap_memory_max": 1073741824, "heap_memory_percentage": 41, | |
"cpu_load_percentage": 7, "number_of_running_jobs": 3, "running_jobs": ["Job 2", "Job 1", "Job 3"]} | |
... | |
''' | |
# Imports | |
import os, sys, json, time, logging | |
from datetime import datetime | |
from streamsets.sdk import ControlHub, DataCollector | |
from logging.handlers import RotatingFileHandler | |
# Control Hub URL | |
sch_url = 'https://cloud.streamsets.com' | |
# Control Hub credentials read from the environment | |
sch_user = os.getenv('SCH_USER') | |
sch_password = os.getenv('SCH_PASSWORD') | |
# The directory and name for the rolling log file | |
output_dir = '/path/to/your-output-dir' | |
log_file_name = 'sdc-resource-metrics.log' | |
# How often to capture SDC metrics | |
metrics_capture_interval_seconds = 15 | |
# Whether or not to print metrics to the console | |
print_metrics_to_console = True | |
# Rolling Logfile config | |
log_file = output_dir + '/' + log_file_name | |
max_bytes_pre_log_file = 100 * 1024 * 1024 # 100MB | |
number_of_rolling_logfiles = 5 | |
# Method to create a rolling log file | |
def create_rotating_log(): | |
logger = logging.getLogger("Rotating Log") | |
logger.setLevel(logging.INFO) | |
handler = RotatingFileHandler(log_file, maxBytes = max_bytes_pre_log_file, backupCount = number_of_rolling_logfiles) | |
logger.addHandler(handler) | |
return logger | |
# Method that returns a list of currently running Jobs on the Data Collector | |
def get_running_jobs(): | |
sch_sdc = sch.data_collectors.get(url=sdc_url) | |
jobs_list = [] | |
for job in sch_sdc.jobs: | |
jobs_list.append(job.job_name) | |
return jobs_list | |
# Validate command line args | |
if len(sys.argv) != 2: | |
print('Incorrect number of arguments') | |
print('Usage: python3 get-sdc-memory-and-cpu-metrics.py <SDC_URL>') | |
sys.exit(-1) | |
# Confirm the logging directory exists] | |
if not os.path.isdir(output_dir): | |
print('Error: the directory \'' + output_dir + '\' does not exist') | |
print('Please create that directory in advance') | |
sys.exit(-1) | |
# Get the SDC URL from the command line | |
sdc_url = sys.argv[1] | |
# Create the log file | |
logger = create_rotating_log() | |
# Connect to Control Hub | |
sch = None | |
try: | |
sch = ControlHub(sch_url, username=sch_user, password=sch_password) | |
except Exception as e: | |
print('Error: Could not connect to Control Hub.') | |
print('Check your credentials and the Control Hub URL,') | |
print ('and you must have Organization Administrator role') | |
print('Exception: '+ str(e)) | |
sys.exit(-1) | |
# Connect to the Data Collector | |
sdc = None | |
try: | |
sdc = DataCollector(sdc_url, control_hub = sch) | |
except Exception as e: | |
print('Error: Could not connect to Data Collector') | |
print('Error; ' + str(e)) | |
sys.exit(-1) | |
print('Getting resource metrics for Data Collector at ' + sdc_url) | |
# Get Data Collector metrics in an endless loop until this script is stopped | |
while(True): | |
try: | |
jmx_metrics = sdc.get_jmx_metrics() | |
heap_metrics = jmx_metrics.get('java.lang:type=Memory')['HeapMemoryUsage'] | |
cpu_metrics = jmx_metrics.get('java.lang:type=OperatingSystem') | |
metrics = {} | |
metrics['sdc_url'] = sdc_url | |
metrics['metric_timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
metrics['heap_memory_used'] = heap_metrics['used'] | |
metrics['heap_memory_max'] = heap_metrics['max'] | |
metrics['heap_memory_percentage'] = int((heap_metrics['used'] / heap_metrics['max']) * 100) | |
metrics['cpu_load_percentage'] = int(cpu_metrics['CpuLoad'] * 100) | |
running_jobs = get_running_jobs() | |
metrics['number_of_running_jobs'] = len(running_jobs) | |
if metrics['number_of_running_jobs'] > 0: | |
metrics['running_jobs'] = running_jobs | |
# Convert the metrics to JSON | |
data = json.dumps(metrics) | |
# Print messages to the console if needed | |
if print_metrics_to_console: | |
print(data) | |
# Write metrics to the rolling logfile | |
logger.info(data) | |
except Exception as e: | |
print('Exception occurred while reading SDC metrics: ' + str(e)) | |
# Sleep | |
time.sleep(metrics_capture_interval_seconds) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment