Last active
February 10, 2023 15:25
-
-
Save leoGalani/322ef44ad062d5df1b8350e420263503 to your computer and use it in GitHub Desktop.
kafka-confluent-python implementation example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import certifi | |
from dynaconf import settings | |
from confluent_kafka.avro import AvroProducer | |
from confluent_kafka import Consumer | |
BASE_CONFIG = { | |
'bootstrap.servers': settings.KAFKA_PRIMARY_BOOTSTRAP_SERVERS, | |
'group.id': 'integrated-tests', | |
'client.id': 'integrated-tests', | |
'security.protocol': settings.KAFKA_PRIMARY_SECURITY_PROTOCOL, | |
'sasl.mechanisms': settings.get('KAFKA_PRIMARY_SASL_MECHANISM', 'PLAIN'), | |
'sasl.username': settings.KAFKA_PRIMARY_KEY, | |
'sasl.password': settings.KAFKA_PRIMARY_SECRET, | |
'ssl.endpoint.identification.algorithm': 'https', | |
'ssl.ca.location': certifi.where(), | |
'auto.offset.reset': 'earliest', | |
} | |
AVRO_SCHEMA_CONFIG = { | |
'schema.registry.url': settings.KAFKA_PRIMARY_SCHEMA_REGISTRY_ENDPOINT, | |
'schema.registry.basic.auth.credentials.source': 'USER_INFO', | |
'schema.registry.basic.auth.user.info': '{}:{}'.format( | |
settings.KAFKA_PRIMARY_SCHEMA_REGISTRY_KEY, | |
settings.KAFKA_PRIMARY_SCHEMA_REGISTRY_SECRET | |
) | |
} | |
PRODUCER_SCHEMA_CONFIG = {**BASE_CONFIG, **AVRO_SCHEMA_CONFIG} | |
def setup_consumer(): | |
return Consumer(BASE_CONFIG) | |
def setup_producer(schema_value): | |
producer_config = PRODUCER_SCHEMA_CONFIG.copy() | |
producer_config['on_delivery'] = delivery_check | |
avro_producer = AvroProducer( | |
producer_config, | |
default_value_schema=schema_value | |
) | |
return avro_producer | |
def delivery_check(err, msg): | |
""" | |
Reports the failure or success of a message delivery. | |
Args: | |
err (KafkaError): The error that occurred on None on success. | |
msg (Message): The message that was produced or failed. | |
""" | |
if err is not None: | |
raise ValueError("Failed to execute command: {}".format(err)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Module for consuming messages on a kafka cluster + kafka schema register. | |
Since the AvroConsumer from the confluent lib does not work out o the box, | |
its required to do the decoding manually. | |
Luckly someone already pass throught this pain and did a blog post about this. | |
https://mlnotetaking.com/post/fixing-kafka-string-key-and-avro-value-python | |
""" | |
import time | |
import logging | |
import avro.schema | |
from config import setup_consumer | |
from utils import unpack, fetch_schema, msg_sanity_check | |
LOGGER = logging.getLogger() | |
LOGGER.setLevel(logging.DEBUG) | |
def event_polling(topic, key_value, msg_value, qty_msgs): | |
""" | |
Polling on the kafka stream and | |
validate the msg key and value before returning data | |
""" | |
consumer = setup_consumer() | |
schema_str_value = fetch_schema(topic) | |
value_schema = avro.schema.Parse(schema_str_value) | |
consumer.subscribe([topic]) | |
msgs = [] | |
timeout = time.time() + 20 | |
while timeout >= time.time() : | |
raw_msg = consumer.poll(1) | |
if not msg_sanity_check(raw_msg): | |
continue | |
msg = unpack(raw_msg.value(), value_schema) | |
if msg[key_value] == msg_value: | |
msgs.append(msg) | |
if len(msgs) == qty_msgs: | |
break | |
if len(msgs) < qty_msgs: | |
raise Exception('Less events than expected') | |
consumer.close() | |
return msgs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import avro.schema | |
from utils import fetch_schema | |
from config import setup_producer | |
def command_producer(topic, msg): | |
""" | |
Fetch the schema, create the producer instance and send the command | |
using AvroProducer. | |
""" | |
schema_str_value = fetch_schema(topic) | |
value_schema = avro.schema.Parse(schema_str_value) | |
producer = setup_producer(value_schema) | |
producer.produce(topic=topic, value=msg) | |
producer.flush() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from avro.io import BinaryDecoder, DatumReader | |
LOGGER = logging.getLogger() | |
LOGGER.setLevel(logging.DEBUG) | |
def fetch_schema(subject): | |
""" | |
To fetch the schema from kafka confluent, the topic names have a sufix of '-value' on the name | |
""" | |
subject = subject + '-value' | |
url = "{0}/subjects/{1}/versions/latest/schema".format( | |
"KAFKA_PRIMARY_SCHEMA_REGISTRY_ENDPOINT", | |
subject | |
) | |
response = requests.get( | |
url = url, | |
auth = ( | |
"KAFKA_PRIMARY_SCHEMA_REGISTRY_KEY", | |
"KAFKA_PRIMARY_SCHEMA_REGISTRY_SECRET" | |
), | |
headers= {"Content-Type": "application/vnd.schemaregistry.v1+json"} | |
) | |
return response.text | |
def msg_sanity_check(msg): | |
try: | |
if msg is None: | |
return False | |
if msg.error() is not None: | |
LOGGER.warning("Msg error: %s", msg.error()) | |
return False | |
finally: | |
return True | |
def unpack(string, schema): | |
reader = DatumReader(schema) | |
for position in range(0,11): | |
try: | |
decoder = BinaryDecoder(io.BytesIO(string[position:])) | |
decoded_msg = reader.read(decoder) | |
return decoded_msg | |
except AssertionError: | |
continue | |
raise Exception('Msg cannot be decoded. MSG: {}.'.format(string)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment