Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Created February 4, 2023 07:36
Show Gist options
  • Save onefoursix/639039b658c3d4453b9b97af10d19e2e to your computer and use it in GitHub Desktop.
Save onefoursix/639039b658c3d4453b9b97af10d19e2e to your computer and use it in GitHub Desktop.
A script that use the StreamSets SDK for Python to explicitly place Jobs on specific SDCs even when HA is enabled
#!/usr/bin/env python
'''This Python script stops a Control Hub Job, removes a dynamic engine label from a "backup"
instance of SDC that the user does not want the Job placed on, starts the Job again,
and then restores the dynamic engine label to the same "backup" SDC. This process
allows the user to start or restart a Job with while preventing it from starting up on the "backup"
SDC.
Prerequisites:
- A Python 3.4 or higher environment to run this script
- The StreamSets SDK for Python module installed in your Python environment, see
https://docs.streamsets.com/platform-sdk/latest/learn/installation.html
'''
# Imports
import datetime, os, sys
from time import sleep
from streamsets.sdk import ControlHub
## USER VARIABLES ##############################################
# CRED_ID -- Your API Credential CRED_ID
CRED_ID = ''
# CRED_TOKEN -- Your API Credential CRED_TOKEN
CRED_TOKEN = ''
# "Backup" SDC URL
BACKUP_ENGINE_URL = ''
# Engine label to manage
ENGINE_LABEL = ''
# Job to start or restart
JOB_ID = ''
# How often to poll Control Hub for Job status updates
POLLING_FREQUENCY_SECONDS = 10
# How long to wait for the Job to become Active
MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE = 20
# How long to wait for the Job to become Inactive
MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_INACTIVE = 5 * 60 # 5 minutes
## END USER VARIABLES ######################################
## Convenience method to prepend timestamps to console messages
def print_message(message):
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' ' + message)
## Init connection to Control Hub ############################
sch = ControlHub(
credential_id=CRED_ID,
token=CRED_TOKEN)
## Get the Backup Engine ##########################################
engine = None
for eng in sch.engines:
if eng.engine_url == BACKUP_ENGINE_URL:
engine = eng
break
if engine is None:
sys.exit('Error: BACKUP Engine with URL ' + BACKUP_ENGINE_URL + ' not found.')
else:
print_message('BACKUP Engine with URL \'' + BACKUP_ENGINE_URL + '\' was found.')
## Get the Job ##########################################
try:
job = sch.jobs.get(job_id = JOB_ID)
except:
job = None
if job is None:
sys.exit('Error: Job with ID ' + JOB_ID + ' not found.')
print_message('Job with name \'' + job.job_name + '\' was found')
print_message('Job ID is \'' + job.job_id + '\'')
## Get the Job status #######################################
job.refresh()
job_status = job.status.status
print_message('Job status is \'' + job_status + '\'')
if job_status not in ('ACTIVE','INACTIVE'):
sys.exit('Error: Job must have status or either \'ACTIVE\' or \'INACTIVE\' for this script to run')
## Stop the Job if it is ACTIVE ###############################
if job_status == 'ACTIVE':
print_message('Attempting to stop Job...')
try:
sch.stop_job(job)
except:
sys.exit('Error: Timeout waiting for Job to DEACTIVATE')
while job_status != 'INACTIVE':
wait_seconds = 0
job.refresh()
job_status = job.status.status
print_message('Waiting for Job to become INACTIVE')
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_INACTIVE:
sys.exit('Error: Timeout waiting for Job to become INACTIVE')
sleep(POLLING_FREQUENCY_SECONDS)
wait_seconds += POLLING_FREQUENCY_SECONDS
print_message('Job status is INACTIVE')
## Remove the Engine Label from the BACKUP engine #############################
print_message('Current dynamic engine labels on BACKUP engine are ' + str(engine.labels))
if ENGINE_LABEL in engine.labels:
print_message('Removing label \'' + ENGINE_LABEL + '\' from BACKUP engine' )
engine.labels.remove(ENGINE_LABEL)
sch.update_engine_labels(engine)
print_message('Dynamic engine labels on BACKUP engine are now ' + str(engine.labels))
else:
print_message('WARNING: Dynamic engine label \'' + ENGINE_LABEL + '\' not currently assigned to BACKUP engine')
## Optional: Set the Job's runtime parameters
# print_message('Setting the Job\'s runtime parameters')
# job.runtime_parameters = {"PARAM_1":"aaa","PARAM_2":"bbb"}
# sch.update_job(job)
## Start the Job #########################################
sch.start_job(job)
print_message('Starting Job on PRIMARY engine...')
## Wait for Job to transition to Active ##########################
job.refresh()
wait_seconds = 0
while job.status.status != 'ACTIVE':
job.refresh()
print_message('Waiting for Job to become ACTIVE')
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE:
sys.exit('Error: Timeout waiting for Job to become ACTIVE')
sleep(POLLING_FREQUENCY_SECONDS)
wait_seconds += POLLING_FREQUENCY_SECONDS
print_message('Job status is ACTIVE')
## Restore the label to the BACKUP SDC
print_message('Restoring label \'' + ENGINE_LABEL + '\' to BACKUP engine' )
engine.labels.append(ENGINE_LABEL)
sch.update_engine_labels(engine)
print_message('Dynamic engine labels are now ' + str(engine.labels))
print_message('Done' )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment