Last active
March 22, 2025 01:35
-
-
Save onefoursix/21453cbabd47933b1a1993263e3a083f to your computer and use it in GitHub Desktop.
An example Python script that creates a StreamSets Kafka Connection using SASL
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
FILE: create_kafka_sasl_connection.py | |
DESCRIPTION: This script provides an example of how to create a | |
STREAMSETS_KAFKA Connection that uses sasl using the StreamSets SDK | |
USAGE: $ python3 create_kafka_sasl_connection.py | |
PREREQUISITES: | |
- Set your connection properties and engine_url in the variables at the top of the script | |
- Set the group_name_list to a list of groups you want to share the Connection with. Members | |
of the groups will be able to use the Connection but will not be able to view the | |
Connection details. | |
- Python 3.9+ | |
- StreamSets Platform SDK for Python v6.5+ | |
See: https://docs.streamsets.com/platform-sdk/latest/welcome/installation.html | |
- StreamSets Platform API Credentials for a user with Organization Administrator role | |
- Before running the script, export the environment variables CRED_ID and CRED_TOKEN | |
with the StreamSets Platform API Credentials, like this: | |
$ export CRED_ID="40af8..." | |
$ export CRED_TOKEN="eyJ0..." | |
""" | |
from streamsets.sdk import ControlHub | |
import sys | |
import os | |
# Connection properties | |
connection_name = 'Confluent Kafka (SDK)' | |
connection_type = 'STREAMSETS_KAFKA' | |
connection_tags = ['tag1', 'tag2'] | |
# STREAMSETS_KAFKA specific Connection properties | |
security_config_security_option = 'CUSTOM' | |
metadata_broker_list = '<redacted>>:9092' | |
# Custom Security Properties | |
security_protocol = 'SASL_SSL' | |
sasl_mechanism = 'PLAIN' | |
sasl_jaas_config = "org.apache.kafka.common.security.plain.PlainLoginModule required username='<redacted>' password='<redacted>';" | |
client_id = '<redacted>>' | |
# List of Groups to share the Connection with | |
group_name_list = ['developers', 'ops'] | |
# Engine URL | |
engine_url = 'http://localhost:18630' | |
# Get Control Hub Credentials from the environment | |
cred_id = os.getenv('CRED_ID') | |
cred_token = os.getenv('CRED_TOKEN') | |
# Connect to Control Hub | |
sch = None | |
try: | |
sch = ControlHub(credential_id=cred_id, token=cred_token) | |
except Exception as e: | |
print('Error connecting to Control Hub; check your CRED_ID and CRED_TOKEN environment variables') | |
print(str(e)) | |
sys.exit(1) | |
# Retrieve the Data Collector engine to be used as the authoring engine | |
engine = sch.engines.get(engine_url=engine_url) | |
# Get a Connection Builder | |
connection_builder = sch.get_connection_builder() | |
# Build the Connection instance | |
connection = connection_builder.build( | |
title=connection_name, | |
connection_type=connection_type, | |
authoring_data_collector=engine, | |
tags=connection_tags) | |
# Create a dictionary of the specific Connection type's properties: | |
props = {} | |
props['securityConfig.securityOption'] = security_config_security_option | |
props['metadataBrokerList'] = metadata_broker_list | |
# Create a list of custom security props key:value pairs | |
custom_security_props = [] | |
custom_security_props.append( { 'key' : 'security.protocol', 'value' : security_protocol } ) | |
custom_security_props.append( { 'key' :'sasl.mechanism', 'value' : sasl_mechanism } ) | |
custom_security_props.append( { 'key' :'sasl.jaas.config', 'value' : sasl_jaas_config } ) | |
custom_security_props.append( { 'key' :'client.id', 'value' : client_id } ) | |
# Add the custom security props to the Connection props | |
props['securityConfig.customSecurityProperties'] = custom_security_props | |
# Append the props to the Connection's config | |
connection.connection_definition.configuration.update(props) | |
# Add the Connection to SCH | |
sch.add_connection(connection) | |
# Verify the Connection | |
result = sch.verify_connection(connection) | |
print(result) | |
# Share the Connection | |
acl = connection.acl | |
actions = ['READ'] # Read permissions allow users to use the connection but not see its details | |
for group_name in group_name_list: | |
group = sch.groups.get(display_name=group_name) | |
group_permission = acl.permission_builder.build( | |
subject_id=group.group_id, subject_type='GROUP', actions=actions) | |
acl.add_permission(group_permission) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment