-
-
Save xshapira/21d57ac3082e9ec68c6e9f428d3feceb to your computer and use it in GitHub Desktop.
Kafka consumer producer class in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from confluent_kafka import Consumer, Producer, KafkaError, KafkaException | |
class Kafka: | |
def __init__(self, bootstrap_server, topic, timeout=1.0): | |
self.__bootstrap_server = bootstrap_server | |
self.__topic = topic | |
self.__timeout = timeout | |
@staticmethod | |
def __on_commit(err, partitions): | |
if err: | |
print(str(err)) | |
else: | |
print(f"Messaged committed to Topic = {partitions[0].topic}, Partition = {partitions[0].partition}, Offset = {partitions[0].offset}") | |
@staticmethod | |
def __callback(err, msg): | |
if err: | |
print(f'Message delivery failed: {err}') | |
else: | |
print(f'Message delivered to Topic = {msg.topic()}, Key = {msg.key()}, Partition = {msg.partition()}, Offset = {msg.offset()}') | |
def __generate_consumer_properties(self, group_id, conf, key_deserializer, value_deserializer, on_commit): | |
default_conf = {'bootstrap.servers': self.__bootstrap_server, | |
'group.id': group_id} | |
if not conf: | |
conf = default_conf | |
conf['enable.auto.commit'] = False | |
conf['auto.offset.reset'] = 'earliest' | |
else: | |
conf = {**default_conf, **conf} | |
if value_deserializer is None: | |
value_deserializer = lambda x: x | |
if key_deserializer is None: | |
key_deserializer = lambda x: x | |
if on_commit is None: | |
on_commit = self.__on_commit | |
conf['on_commit'] = on_commit | |
return conf, key_deserializer, value_deserializer | |
def __generate_producer_properties(self, conf, key_serializer, value_serializer, callback): | |
default_conf = {'bootstrap.servers': self.__bootstrap_server} | |
if not conf: | |
conf = default_conf | |
conf['acks'] = 'all' | |
conf['retries'] = 5 | |
conf['linger.ms'] = 20 | |
conf['batch.num.messages'] = 1000 | |
conf['batch.size'] = 1024 * 64 # 64KB | |
conf['compression.codec'] = 'snappy' | |
conf['enable.idempotence'] = True | |
conf['delivery.timeout.ms'] = 120000 # 2 minutes | |
conf['max.in.flight.requests.per.connection'] = 5 | |
else: | |
conf = {**default_conf, **conf} | |
if value_serializer is None: | |
value_serializer = lambda x: x | |
if key_serializer is None: | |
key_serializer = lambda x: x | |
if callback is None: | |
callback = self.__callback | |
return conf, key_serializer, value_serializer, callback | |
def consume(self, group_id, conf={}, key_deserializer=None, value_deserializer=None, on_commit=None): | |
conf, key_deserializer, value_deserializer = self.__generate_consumer_properties(group_id, | |
conf, | |
key_deserializer, | |
value_deserializer, | |
on_commit) | |
consumer = Consumer(**conf) | |
try: | |
consumer.subscribe([self.__topic]) | |
while True: | |
msg = consumer.poll(timeout=self.__timeout) | |
if msg is None: | |
continue | |
if msg.error(): | |
if msg.error().code() == KafkaError._PARTITION_EOF: | |
# End of partition event | |
print(f'{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}\n') | |
elif msg.error(): | |
raise KafkaException(msg.error()) | |
yield {'Topic': msg.topic(), | |
'Value': value_deserializer(msg.value()), | |
'Key': key_deserializer(msg.key()), | |
'Offset': msg.offset(), | |
'Partition': msg.partition()} | |
consumer.commit(msg, asynchronous=True) | |
finally: | |
consumer.close() | |
def produce(self, data, conf={}, key_serializer=None, value_serializer=None, callback=None): | |
conf, key_serializer, value_serializer, callback = self.__generate_producer_properties(conf, | |
key_serializer, | |
value_serializer, | |
callback) | |
producer = Producer(**conf) | |
try: | |
for key, value in data: | |
print(f"Sending message to Topic = {self.__topic}, Key = {key}") | |
producer.produce(topic=self.__topic, | |
key=key_serializer(key), | |
value=value_serializer(value), | |
callback=callback) | |
producer.poll(timeout=self.__timeout) | |
finally: | |
producer.flush() | |
############# producer example ############# | |
# from kafka import Kafka | |
# import json | |
# | |
# bootstrap_servers = 'localhost:9092' | |
# topic = 'first_topic' | |
# key_serializer = lambda x: x.encode('utf-8') | |
# value_serializer = lambda x: json.dumps(x).encode('utf-8') | |
# data = (('key1', {'Value': 'value1'}), ('key2', {'Value': 'value2'}), ('key3', {'Value': 'value3'})) | |
# | |
# k = Kafka(bootstrap_servers, topic) | |
# k.produce(data, key_serializer=key_serializer, value_serializer=value_serializer) | |
############# consumer example ############# | |
# from kafka import Kafka | |
# import json | |
# | |
# | |
# bootstrap_servers = 'localhost:9092' | |
# topic = 'first_topic' | |
# group_id = 'my-group' | |
# value_deserializer = lambda x: json.loads(x.decode('utf-8')) | |
# key_deserializer = lambda x: x.decode('utf-8') | |
# | |
# k = Kafka(bootstrap_servers, topic) | |
# cur = k.consume(group_id, key_deserializer=key_deserializer, value_deserializer=value_deserializer) | |
# for msg in cur: | |
# print(msg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment