Last active
May 27, 2024 15:41
-
-
Save onefoursix/45346d062ca91890ac71416395826939 to your computer and use it in GitHub Desktop.
Python script to spin up an SDC instance on K8s, run a Job on that SDC, wait for the Job to complete, and then tear down the deployed SDC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
'''This Python script deploys an instance of SDC on Kubernetes using a Control Agent, | |
starts a Job that runs on that instance, waits for the Job to complete, then deletes the | |
Control Hub Depoyment which tears down the deployed SDC. | |
This version of the script runs against Control Hub 3.x | |
Prerequisites: | |
- A Kubernetes Cluster with a registered Provisioning Agent | |
- An SDC Deployment manifest saved in a file, with the file location set in the | |
script variable SDC_DEPLOYMENT_MANIFEST set below. See the examples at | |
https://github.com/streamsets/tutorials/tree/master/tutorial-kubernetes-deployment | |
- A Python 3.4 or higher environment to run the script | |
- The StreamSets SDK for Python module installed in your Python environment, see | |
https://docs.streamsets.com/sdk/latest/installation.html | |
''' | |
# Imports | |
import datetime, sys, yaml | |
from time import sleep | |
from streamsets.sdk import ControlHub | |
## USER VARIABLES ############################################## | |
CONTROL_HUB_URL = 'https://cloud.streamsets.com' | |
CONTROL_HUB_USER= | |
CONTROL_HUB_PASSWORD= | |
# The ID of the Job to run | |
JOB_ID = | |
# The LABEL to match the Job to the ephemeral SDC Instance | |
LABEL = | |
# Provisioning Agent ID | |
PROVISIONING_AGENT_ID = | |
# SDC Deployment Manifest (a path to a file on the machine where this script is executed) | |
SDC_DEPLOYMENT_MANIFEST = | |
# How long to wait for the SDC Deployment to complete | |
MAX_WAIT_SECONDS_FOR_SDC_DEPLOYMENT = 60 * 3 # example for 3 minutes | |
# Frequency to poll Control Hub for status | |
POLLING_FREQUENCY_SECONDS = 30 | |
# How long to wait for the Job to become Active | |
MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE = 20 | |
# How long to wait for the Job to complete | |
MAX_WAIT_SECONDS_FOR_JOB_TO_COMPLETE = 60 * 10 # example for 10 minutes | |
############################################## | |
def print_message(message): | |
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' ' + message) | |
## Init connection to Control Hub | |
print_message('Connecting to Control Hub') | |
sch = ControlHub(CONTROL_HUB_URL, | |
username=CONTROL_HUB_USER, | |
password=CONTROL_HUB_PASSWORD) | |
## Get Job | |
print_message('Getting Job with JOB_ID ' + JOB_ID) | |
job = sch.jobs.get(job_id = JOB_ID) | |
## Set Runtime Parameters and update the Job | |
print_message('Setting the Job\'s runtime parameters') | |
job.runtime_parameters = {"PARAM_1":"aaa","PARAM_2":"bbb"} | |
print_message('Setting the Job\'s label') | |
job.data_collector_labels = [LABEL] | |
sch.update_job(job) | |
## Start SDC Kubernetes-based Deployment | |
print_message('Creating Control Hub Deployment') | |
deployment_builder = sch.get_deployment_builder() | |
## Get the provisioning agent to be used to manage this deployment | |
provisioning_agent = sch.provisioning_agents.get(id=PROVISIONING_AGENT_ID) | |
## Load the SDC Deployment maifest | |
with open(SDC_DEPLOYMENT_MANIFEST) as f: | |
deployment_spec = yaml.load(f, Loader=yaml.BaseLoader) | |
## Build the deployment | |
deployment = deployment_builder.build(name=LABEL, | |
provisioning_agent=provisioning_agent, | |
number_of_data_collector_instances=1, | |
description='', | |
data_collector_labels=[LABEL], | |
spec=deployment_spec) | |
## Add the deployment to Control Hub | |
sch.add_deployment(deployment) | |
## Start the deployment | |
print_message('Starting Control Hub Deployment') | |
sch.start_deployment(deployment) | |
## Wait for the new SDC instance to register with Control Hub | |
wait_seconds = 0 | |
sdc_is_registered = False | |
while not sdc_is_registered: | |
print_message('Waiting for SDC to register with Control Hub') | |
for sdc in sch.data_collectors: | |
for reported_label in sdc.reported_labels: | |
if reported_label == LABEL: | |
sdc_is_registered = True | |
print_message('SDC has registered with Control Hub') | |
break | |
if sdc_is_registered: | |
break | |
if sdc_is_registered: | |
break | |
if wait_seconds > MAX_WAIT_SECONDS_FOR_SDC_DEPLOYMENT: | |
sys.exit('Error: Timeout waiting for SDC to register with Control Hub. ') | |
sleep(POLLING_FREQUENCY_SECONDS) | |
wait_seconds += POLLING_FREQUENCY_SECONDS | |
## Start Job | |
sch.start_job(job) | |
print_message('Starting Job with Job ID: ' + JOB_ID) | |
## Wait for Job to transition to Active | |
job.refresh() | |
wait_seconds = 0 | |
while job.status.status != 'ACTIVE': | |
job.refresh() | |
print_message('Waiting for Job to become ACTIVE') | |
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE: | |
sys.exit('Error: Timeout waiting for Job to become ACTIVE') | |
sleep(POLLING_FREQUENCY_SECONDS) | |
wait_seconds += POLLING_FREQUENCY_SECONDS | |
print_message('Job status is ACTIVE') | |
## Wait for Job to complete or to timeout | |
print_message('Waiting for Job to complete') | |
job.refresh() | |
wait_seconds = 0 | |
while job.status.status != 'INACTIVE': | |
job.refresh() | |
print_message('Waiting for Job to complete') | |
sleep(POLLING_FREQUENCY_SECONDS) | |
wait_seconds += POLLING_FREQUENCY_SECONDS | |
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_COMPLETE: | |
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' Error: Timeout waiting for Job to complete') | |
break | |
if job.status.status == 'INACTIVE': | |
print_message('Job completed successfully') | |
else: | |
print_message('Error: Job did not complete successfully') | |
print_message('Job status is ' + job.status.status) | |
## Stop the SDC Kubernetes-based Deployment | |
print_message('Stopping SDC deployment') | |
sch.stop_deployment(deployment) | |
## Delete the SDC Kubernetes-based Deployment | |
print_message('Deleting SDC deployment') | |
sch.delete_deployment(deployment) | |
print_message('Done') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment