Last active
January 18, 2023 12:45
-
-
Save rkpattnaik780/07487e9edd99917c0c3fd79b291f6946 to your computer and use it in GitHub Desktop.
Code snippets for confluent-python using SASL/OAuthBearer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import requests | |
def _get_token(config): | |
payload = {"grant_type": "client_credentials", "scope": "api.iam.service_accounts"} | |
resp = requests.post( | |
RHOAS_SERVICE_ACCOUNT_OAUTH_TOKEN_URL, | |
auth=( | |
RHOAS_SERVICE_ACCOUNT_CLIENT_ID, | |
RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET, | |
), | |
data=payload, | |
) | |
token = resp.json() | |
return token["access_token"], time.time() + float(token["expires_in"]) | |
common_config = { | |
'bootstrap.servers': KAFKA_HOST, | |
'security.protocol': 'SASL_SSL', | |
'sasl.mechanisms': 'OAUTHBEARER', | |
'oauth_cb': _get_token, | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from confluent_kafka import Consumer | |
consumer_config = { | |
"group.id": "test-group", | |
'session.timeout.ms': 6000, | |
'auto.offset.reset': 'earliest', | |
} | |
consumer = Consumer({ **consumer_config, **common_config }) | |
consumer.subscribe([topic]) | |
while True: | |
try: | |
msg = consumer.poll(1.0) | |
if msg is None: | |
continue | |
print(msg.value()) | |
except KeyboardInterrupt: | |
break | |
consumer.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from confluent_kafka import Producer | |
producer = Producer(common_config) | |
producer.produce(topic=topic, value=b"Sample Message") | |
producer.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment