Created
February 4, 2023 07:36
-
-
Save onefoursix/639039b658c3d4453b9b97af10d19e2e to your computer and use it in GitHub Desktop.
A script that use the StreamSets SDK for Python to explicitly place Jobs on specific SDCs even when HA is enabled
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
'''This Python script stops a Control Hub Job, removes a dynamic engine label from a "backup" | |
instance of SDC that the user does not want the Job placed on, starts the Job again, | |
and then restores the dynamic engine label to the same "backup" SDC. This process | |
allows the user to start or restart a Job with while preventing it from starting up on the "backup" | |
SDC. | |
Prerequisites: | |
- A Python 3.4 or higher environment to run this script | |
- The StreamSets SDK for Python module installed in your Python environment, see | |
https://docs.streamsets.com/platform-sdk/latest/learn/installation.html | |
''' | |
# Imports | |
import datetime, os, sys | |
from time import sleep | |
from streamsets.sdk import ControlHub | |
## USER VARIABLES ############################################## | |
# CRED_ID -- Your API Credential CRED_ID | |
CRED_ID = '' | |
# CRED_TOKEN -- Your API Credential CRED_TOKEN | |
CRED_TOKEN = '' | |
# "Backup" SDC URL | |
BACKUP_ENGINE_URL = '' | |
# Engine label to manage | |
ENGINE_LABEL = '' | |
# Job to start or restart | |
JOB_ID = '' | |
# How often to poll Control Hub for Job status updates | |
POLLING_FREQUENCY_SECONDS = 10 | |
# How long to wait for the Job to become Active | |
MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE = 20 | |
# How long to wait for the Job to become Inactive | |
MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_INACTIVE = 5 * 60 # 5 minutes | |
## END USER VARIABLES ###################################### | |
## Convenience method to prepend timestamps to console messages | |
def print_message(message): | |
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' ' + message) | |
## Init connection to Control Hub ############################ | |
sch = ControlHub( | |
credential_id=CRED_ID, | |
token=CRED_TOKEN) | |
## Get the Backup Engine ########################################## | |
engine = None | |
for eng in sch.engines: | |
if eng.engine_url == BACKUP_ENGINE_URL: | |
engine = eng | |
break | |
if engine is None: | |
sys.exit('Error: BACKUP Engine with URL ' + BACKUP_ENGINE_URL + ' not found.') | |
else: | |
print_message('BACKUP Engine with URL \'' + BACKUP_ENGINE_URL + '\' was found.') | |
## Get the Job ########################################## | |
try: | |
job = sch.jobs.get(job_id = JOB_ID) | |
except: | |
job = None | |
if job is None: | |
sys.exit('Error: Job with ID ' + JOB_ID + ' not found.') | |
print_message('Job with name \'' + job.job_name + '\' was found') | |
print_message('Job ID is \'' + job.job_id + '\'') | |
## Get the Job status ####################################### | |
job.refresh() | |
job_status = job.status.status | |
print_message('Job status is \'' + job_status + '\'') | |
if job_status not in ('ACTIVE','INACTIVE'): | |
sys.exit('Error: Job must have status or either \'ACTIVE\' or \'INACTIVE\' for this script to run') | |
## Stop the Job if it is ACTIVE ############################### | |
if job_status == 'ACTIVE': | |
print_message('Attempting to stop Job...') | |
try: | |
sch.stop_job(job) | |
except: | |
sys.exit('Error: Timeout waiting for Job to DEACTIVATE') | |
while job_status != 'INACTIVE': | |
wait_seconds = 0 | |
job.refresh() | |
job_status = job.status.status | |
print_message('Waiting for Job to become INACTIVE') | |
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_INACTIVE: | |
sys.exit('Error: Timeout waiting for Job to become INACTIVE') | |
sleep(POLLING_FREQUENCY_SECONDS) | |
wait_seconds += POLLING_FREQUENCY_SECONDS | |
print_message('Job status is INACTIVE') | |
## Remove the Engine Label from the BACKUP engine ############################# | |
print_message('Current dynamic engine labels on BACKUP engine are ' + str(engine.labels)) | |
if ENGINE_LABEL in engine.labels: | |
print_message('Removing label \'' + ENGINE_LABEL + '\' from BACKUP engine' ) | |
engine.labels.remove(ENGINE_LABEL) | |
sch.update_engine_labels(engine) | |
print_message('Dynamic engine labels on BACKUP engine are now ' + str(engine.labels)) | |
else: | |
print_message('WARNING: Dynamic engine label \'' + ENGINE_LABEL + '\' not currently assigned to BACKUP engine') | |
## Optional: Set the Job's runtime parameters | |
# print_message('Setting the Job\'s runtime parameters') | |
# job.runtime_parameters = {"PARAM_1":"aaa","PARAM_2":"bbb"} | |
# sch.update_job(job) | |
## Start the Job ######################################### | |
sch.start_job(job) | |
print_message('Starting Job on PRIMARY engine...') | |
## Wait for Job to transition to Active ########################## | |
job.refresh() | |
wait_seconds = 0 | |
while job.status.status != 'ACTIVE': | |
job.refresh() | |
print_message('Waiting for Job to become ACTIVE') | |
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE: | |
sys.exit('Error: Timeout waiting for Job to become ACTIVE') | |
sleep(POLLING_FREQUENCY_SECONDS) | |
wait_seconds += POLLING_FREQUENCY_SECONDS | |
print_message('Job status is ACTIVE') | |
## Restore the label to the BACKUP SDC | |
print_message('Restoring label \'' + ENGINE_LABEL + '\' to BACKUP engine' ) | |
engine.labels.append(ENGINE_LABEL) | |
sch.update_engine_labels(engine) | |
print_message('Dynamic engine labels are now ' + str(engine.labels)) | |
print_message('Done' ) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment