Last active
February 18, 2023 16:43
-
-
Save iKunalChhabra/43ce648e06ccfb606b2150c6bf3ea5aa to your computer and use it in GitHub Desktop.
Kafka consumer producer class in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from confluent_kafka import Consumer, Producer, KafkaError, KafkaException | |
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer | |
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField | |
from confluent_kafka.schema_registry import SchemaRegistryClient | |
class Kafka: | |
def __init__(self, bootstrap_server, topic, timeout=60.0): | |
self.__bootstrap_server = bootstrap_server | |
self.__topic = topic | |
self.__timeout = timeout | |
self.__auth_conf = {} | |
def set_auth(self, auth_conf): | |
self.__auth_conf = auth_conf | |
@staticmethod | |
def __on_commit(err, partitions): | |
if err: | |
print(str(err)) | |
else: | |
print(f"Messaged committed to Topic = {partitions[0].topic}, Partition = {partitions[0].partition}, Offset = {partitions[0].offset}") | |
@staticmethod | |
def __callback(err, msg): | |
if err: | |
print(f'Message delivery failed: {err}') | |
else: | |
print(f'Message delivered to Topic = {msg.topic()}, Key = {msg.key()}, Partition = {msg.partition()}, Offset = {msg.offset()}') | |
def __generate_consumer_properties(self, group_id, conf, key_deserializer, value_deserializer, on_commit): | |
default_conf = {'bootstrap.servers': self.__bootstrap_server, | |
'group.id': group_id} | |
default_conf.update(self.__auth_conf) | |
if not conf: | |
conf = default_conf | |
conf['enable.auto.commit'] = False | |
conf['auto.offset.reset'] = 'earliest' | |
else: | |
conf = {**default_conf, **conf} | |
if value_deserializer is None: | |
value_deserializer = lambda x: x | |
if key_deserializer is None: | |
key_deserializer = lambda x: x | |
if on_commit is None: | |
on_commit = self.__on_commit | |
conf['on_commit'] = on_commit | |
return conf, key_deserializer, value_deserializer | |
def __generate_producer_properties(self, conf, key_serializer, value_serializer, callback): | |
default_conf = {'bootstrap.servers': self.__bootstrap_server} | |
default_conf.update(self.__auth_conf) | |
if not conf: | |
conf = default_conf | |
conf['acks'] = 'all' | |
conf['retries'] = 5 | |
conf['linger.ms'] = 100 | |
conf['batch.num.messages'] = 1_000 | |
conf['batch.size'] = 1024 * 64 # 64KB | |
conf['compression.codec'] = 'snappy' | |
conf['enable.idempotence'] = True | |
conf['delivery.timeout.ms'] = 1000*120 # 2 minutes | |
conf['max.in.flight.requests.per.connection'] = 5 | |
else: | |
conf = {**default_conf, **conf} | |
if value_serializer is None: | |
value_serializer = lambda x: x | |
if key_serializer is None: | |
key_serializer = lambda x: x | |
if callback is None: | |
callback = self.__callback | |
return conf, key_serializer, value_serializer, callback | |
def get_schema_registry_client(self, conf): | |
return SchemaRegistryClient(conf) | |
def get_schema(self, schema_registry_conf, subject): | |
client = self.get_schema_registry_client(schema_registry_conf) | |
return client.get_latest_version(subject).schema.schema_str | |
def get_avro_serializer(self, schema_registry_conf, subject, type): | |
client = self.get_schema_registry_client(schema_registry_conf) | |
schema = self.get_schema(schema_registry_conf, subject) | |
avro_conf = {'auto.register.schemas': False, | |
'use.latest.version': True} | |
avro_ser = AvroSerializer(schema_registry_client=client, | |
schema_str=schema, | |
conf=avro_conf) | |
if type == 'KEY': | |
return lambda x: avro_ser(x, SerializationContext(self.__topic, MessageField.KEY)) | |
elif type == 'VALUE': | |
return lambda x: avro_ser(x, SerializationContext(self.__topic, MessageField.VALUE)) | |
else: | |
raise ValueError(f'Invalid type {type}') | |
def get_avro_deserializer(self, schema_registry_conf, subject, type): | |
client = self.get_schema_registry_client(schema_registry_conf) | |
schema = self.get_schema(schema_registry_conf, subject) | |
avro_deser = AvroDeserializer(schema_registry_client=client, | |
schema_str=schema) | |
if type == 'KEY': | |
return lambda x: avro_deser(x, SerializationContext(self.__topic, MessageField.KEY)) | |
elif type == 'VALUE': | |
return lambda x: avro_deser(x, SerializationContext(self.__topic, MessageField.VALUE)) | |
else: | |
raise ValueError(f'Invalid type {type}') | |
def consume(self, group_id, conf={}, key_deserializer=None, value_deserializer=None, on_commit=None): | |
conf, key_deserializer, value_deserializer = self.__generate_consumer_properties(group_id, | |
conf, | |
key_deserializer, | |
value_deserializer, | |
on_commit) | |
consumer = Consumer(**conf) | |
try: | |
consumer.subscribe([self.__topic]) | |
while True: | |
msg = consumer.poll(timeout=self.__timeout) | |
if msg is None: | |
continue | |
if msg.error(): | |
if msg.error().code() == KafkaError._PARTITION_EOF: | |
# End of partition event | |
print(f'{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}\n') | |
elif msg.error(): | |
raise KafkaException(msg.error()) | |
yield {'Topic': msg.topic(), | |
'Value': value_deserializer(msg.value()), | |
'Key': key_deserializer(msg.key()), | |
'Offset': msg.offset(), | |
'Partition': msg.partition()} | |
consumer.commit(msg, asynchronous=True) | |
finally: | |
consumer.close() | |
def produce(self, data, conf={}, key_serializer=None, value_serializer=None, callback=None): | |
conf, key_serializer, value_serializer, callback = self.__generate_producer_properties(conf, | |
key_serializer, | |
value_serializer, | |
callback) | |
producer = Producer(**conf) | |
try: | |
for key, value in data: | |
print(f"Sending message to Topic = {self.__topic}, Key = {key}") | |
producer.produce(topic=self.__topic, | |
key=key_serializer(key), | |
value=value_serializer(value), | |
callback=callback) | |
finally: | |
producer.flush(timeout=self.__timeout) | |
# ############ producer example ############# | |
# from kafka import Kafka | |
# import json | |
# | |
# bootstrap_servers = 'localhost:9092' | |
# topic = 'first_topic' | |
# key_serializer = lambda x: x.encode('utf-8') | |
# value_serializer = lambda x: json.dumps(x).encode('utf-8') | |
# data = (('key1', {'Value': 'value1'}), ('key2', {'Value': 'value2'}), ('key3', {'Value': 'value3'})) | |
# | |
# k = Kafka(bootstrap_servers, topic) | |
# k.produce(data, key_serializer=key_serializer, value_serializer=value_serializer) | |
# ############ consumer example ############# | |
# from kafka import Kafka | |
# import json | |
# | |
# | |
# bootstrap_servers = 'localhost:9092' | |
# topic = 'first_topic' | |
# group_id = 'my-group' | |
# value_deserializer = lambda x: json.loads(x.decode('utf-8')) | |
# key_deserializer = lambda x: x.decode('utf-8') | |
# | |
# k = Kafka(bootstrap_servers, topic) | |
# cur = k.consume(group_id, key_deserializer=key_deserializer, value_deserializer=value_deserializer) | |
# for msg in cur: | |
# print(msg) | |
# ############ avro producer example ############# | |
# from kafka import Kafka | |
# | |
# k = Kafka(bootstrap_servers, topic) | |
# | |
# bootstrap_servers = 'localhost:9092' | |
# topic = 'first_topic' | |
# | |
# schema_registry_conf = {'url': 'http://localhost:8081', 'basic.auth.credentials.source': 'USER_INFO', 'basic.auth.user.info': 'user:password'} | |
# key_serializer = k.get_avro_serializer(schema_registry_conf, 'first_topic-key', 'KEY') | |
# value_serializer = k.get_avro_serializer(schema_registry_conf, 'first_topic-value', 'VALUE') | |
# data = (('key1', {'Value': 'value1'}), ('key2', {'Value': 'value2'}), ('key3', {'Value': 'value3'})) | |
# | |
# k.produce(data, key_serializer=key_serializer, value_serializer=value_serializer) | |
# ############ avro consumer example ############# | |
# from kafka import Kafka | |
# | |
# k = Kafka(bootstrap_servers, topic) | |
# | |
# bootstrap_servers = 'localhost:9092' | |
# topic = 'first_topic' | |
# group_id = 'my-group' | |
# | |
# schema_registry_conf = {'url': 'http://localhost:8081', 'basic.auth.credentials.source': 'USER_INFO', 'basic.auth.user.info': 'user:password'} | |
# key_deserializer = k.get_avro_deserializer(schema_registry_conf, 'first_topic-key', 'KEY') | |
# value_deserializer = k.get_avro_deserializer(schema_registry_conf, 'first_topic-value', 'VALUE') | |
# | |
# cur = k.consume(group_id, key_deserializer=key_deserializer, value_deserializer=value_deserializer) | |
# for msg in cur: | |
# print(msg) | |
# ############ SASL SSL Auth Example ############# | |
# from kafka import Kafka | |
# | |
# bootstrap_servers = 'localhost:9092' | |
# topic = 'first_topic' | |
# k = Kafka(bootstrap_servers, topic) | |
# | |
# data = (('key1', {'Value': 'value1'}), ('key2', {'Value': 'value2'}), ('key3', {'Value': 'value3'})) | |
# | |
# auth_config = {'sasl.mechanism': 'PLAIN', | |
# 'security.protocol': 'SASL_SSL', | |
# 'sasl.username': 'user', | |
# 'sasl.password': 'password'} | |
# k.set_auth(auth_config) | |
# | |
# k.produce(data) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment