Skip to content

Instantly share code, notes, and snippets.

@rkpattnaik780
Last active January 18, 2023 12:45
Show Gist options
  • Save rkpattnaik780/07487e9edd99917c0c3fd79b291f6946 to your computer and use it in GitHub Desktop.
Save rkpattnaik780/07487e9edd99917c0c3fd79b291f6946 to your computer and use it in GitHub Desktop.
Code snippets for confluent-python using SASL/OAuthBearer
import time
import requests
def _get_token(config):
payload = {"grant_type": "client_credentials", "scope": "api.iam.service_accounts"}
resp = requests.post(
RHOAS_SERVICE_ACCOUNT_OAUTH_TOKEN_URL,
auth=(
RHOAS_SERVICE_ACCOUNT_CLIENT_ID,
RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET,
),
data=payload,
)
token = resp.json()
return token["access_token"], time.time() + float(token["expires_in"])
common_config = {
'bootstrap.servers': KAFKA_HOST,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
'oauth_cb': _get_token,
}
from confluent_kafka import Consumer
consumer_config = {
"group.id": "test-group",
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest',
}
consumer = Consumer({ **consumer_config, **common_config })
consumer.subscribe([topic])
while True:
try:
msg = consumer.poll(1.0)
if msg is None:
continue
print(msg.value())
except KeyboardInterrupt:
break
consumer.close()
from confluent_kafka import Producer
producer = Producer(common_config)
producer.produce(topic=topic, value=b"Sample Message")
producer.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment