Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Last active May 27, 2024 15:41
Show Gist options
  • Save onefoursix/45346d062ca91890ac71416395826939 to your computer and use it in GitHub Desktop.
Save onefoursix/45346d062ca91890ac71416395826939 to your computer and use it in GitHub Desktop.
Python script to spin up an SDC instance on K8s, run a Job on that SDC, wait for the Job to complete, and then tear down the deployed SDC
#!/usr/bin/env python
'''This Python script deploys an instance of SDC on Kubernetes using a Control Agent,
starts a Job that runs on that instance, waits for the Job to complete, then deletes the
Control Hub Depoyment which tears down the deployed SDC.
This version of the script runs against Control Hub 3.x
Prerequisites:
- A Kubernetes Cluster with a registered Provisioning Agent
- An SDC Deployment manifest saved in a file, with the file location set in the
script variable SDC_DEPLOYMENT_MANIFEST set below. See the examples at
https://github.com/streamsets/tutorials/tree/master/tutorial-kubernetes-deployment
- A Python 3.4 or higher environment to run the script
- The StreamSets SDK for Python module installed in your Python environment, see
https://docs.streamsets.com/sdk/latest/installation.html
'''
# Imports
import datetime, sys, yaml
from time import sleep
from streamsets.sdk import ControlHub
## USER VARIABLES ##############################################
CONTROL_HUB_URL = 'https://cloud.streamsets.com'
CONTROL_HUB_USER=
CONTROL_HUB_PASSWORD=
# The ID of the Job to run
JOB_ID =
# The LABEL to match the Job to the ephemeral SDC Instance
LABEL =
# Provisioning Agent ID
PROVISIONING_AGENT_ID =
# SDC Deployment Manifest (a path to a file on the machine where this script is executed)
SDC_DEPLOYMENT_MANIFEST =
# How long to wait for the SDC Deployment to complete
MAX_WAIT_SECONDS_FOR_SDC_DEPLOYMENT = 60 * 3 # example for 3 minutes
# Frequency to poll Control Hub for status
POLLING_FREQUENCY_SECONDS = 30
# How long to wait for the Job to become Active
MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE = 20
# How long to wait for the Job to complete
MAX_WAIT_SECONDS_FOR_JOB_TO_COMPLETE = 60 * 10 # example for 10 minutes
##############################################
def print_message(message):
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' ' + message)
## Init connection to Control Hub
print_message('Connecting to Control Hub')
sch = ControlHub(CONTROL_HUB_URL,
username=CONTROL_HUB_USER,
password=CONTROL_HUB_PASSWORD)
## Get Job
print_message('Getting Job with JOB_ID ' + JOB_ID)
job = sch.jobs.get(job_id = JOB_ID)
## Set Runtime Parameters and update the Job
print_message('Setting the Job\'s runtime parameters')
job.runtime_parameters = {"PARAM_1":"aaa","PARAM_2":"bbb"}
print_message('Setting the Job\'s label')
job.data_collector_labels = [LABEL]
sch.update_job(job)
## Start SDC Kubernetes-based Deployment
print_message('Creating Control Hub Deployment')
deployment_builder = sch.get_deployment_builder()
## Get the provisioning agent to be used to manage this deployment
provisioning_agent = sch.provisioning_agents.get(id=PROVISIONING_AGENT_ID)
## Load the SDC Deployment maifest
with open(SDC_DEPLOYMENT_MANIFEST) as f:
deployment_spec = yaml.load(f, Loader=yaml.BaseLoader)
## Build the deployment
deployment = deployment_builder.build(name=LABEL,
provisioning_agent=provisioning_agent,
number_of_data_collector_instances=1,
description='',
data_collector_labels=[LABEL],
spec=deployment_spec)
## Add the deployment to Control Hub
sch.add_deployment(deployment)
## Start the deployment
print_message('Starting Control Hub Deployment')
sch.start_deployment(deployment)
## Wait for the new SDC instance to register with Control Hub
wait_seconds = 0
sdc_is_registered = False
while not sdc_is_registered:
print_message('Waiting for SDC to register with Control Hub')
for sdc in sch.data_collectors:
for reported_label in sdc.reported_labels:
if reported_label == LABEL:
sdc_is_registered = True
print_message('SDC has registered with Control Hub')
break
if sdc_is_registered:
break
if sdc_is_registered:
break
if wait_seconds > MAX_WAIT_SECONDS_FOR_SDC_DEPLOYMENT:
sys.exit('Error: Timeout waiting for SDC to register with Control Hub. ')
sleep(POLLING_FREQUENCY_SECONDS)
wait_seconds += POLLING_FREQUENCY_SECONDS
## Start Job
sch.start_job(job)
print_message('Starting Job with Job ID: ' + JOB_ID)
## Wait for Job to transition to Active
job.refresh()
wait_seconds = 0
while job.status.status != 'ACTIVE':
job.refresh()
print_message('Waiting for Job to become ACTIVE')
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE:
sys.exit('Error: Timeout waiting for Job to become ACTIVE')
sleep(POLLING_FREQUENCY_SECONDS)
wait_seconds += POLLING_FREQUENCY_SECONDS
print_message('Job status is ACTIVE')
## Wait for Job to complete or to timeout
print_message('Waiting for Job to complete')
job.refresh()
wait_seconds = 0
while job.status.status != 'INACTIVE':
job.refresh()
print_message('Waiting for Job to complete')
sleep(POLLING_FREQUENCY_SECONDS)
wait_seconds += POLLING_FREQUENCY_SECONDS
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_COMPLETE:
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' Error: Timeout waiting for Job to complete')
break
if job.status.status == 'INACTIVE':
print_message('Job completed successfully')
else:
print_message('Error: Job did not complete successfully')
print_message('Job status is ' + job.status.status)
## Stop the SDC Kubernetes-based Deployment
print_message('Stopping SDC deployment')
sch.stop_deployment(deployment)
## Delete the SDC Kubernetes-based Deployment
print_message('Deleting SDC deployment')
sch.delete_deployment(deployment)
print_message('Done')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment