pip install confluent-kafka
It's important to declare
producer
andconsumer
objects in global scope for better performance
These objects will handle connections with Kafka providers in other threads in background
File: kafka_connectors
import socket
from confluent_kafka import Producer, Consumer
kafka_config = {
'producer': {
'bootstrap.servers': 'host1:9092,host2:9092',
'client.id': socket.gethostname()
},
'consumer': {
'bootstrap.servers': 'host1:9092,host2:9092',
'group.id': 'foo',
'auto.offset.reset': 'smallest'
}
}
producer = Producer(kafka_config['producer'])
consumer = Consumer(kafka_config['consumer'])
from kafka_connectors import producer
def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
else:
print("Message produced: %s" % (str(msg)))
# sending 100 events in async mode
for i in range(100):
producer.produce(topic, key="key", value=f"value-{i}", callback=acked)
# Wait up to 1 second for events. Callbacks will be invoked during
# this method call if the message is acknowledged.
producer.poll(1)
from kafka_connectors import producer
# sending 100 events in sync mode
for i in range(100):
producer.produce(topic="topic", key="key", value=f"value-{i}", callback=acked)
producer.flush()
from confluent_kafka import KafkaException
from kafka_connectors import consumer
running = True
max_messages_by_poll = 5
def do_something(msg):
# exit if error
if msg.error():
raise KafkaException(msg.error())
# display received message
topic = msg.topic()
msg_key = msg.key().decode('utf-8')
msg_value = msg.value().decode('utf-8')
print(f'Received message: {msg_key}: {msg_value} from topic: {topic}')
def consume_loop_sync(consumer, topics):
try:
consumer.subscribe(topics)
global running
while running:
# consume kafka topic
received_messages = consumer.poll(num_messages=max_messages_by_poll, timeout=1.0)
# for each message received, do something
for msg in received_messages:
do_something(msg)
finally:
# Close down consumer to commit final offsets.
consumer.close()
def shutdown():
print('shuting down consumer...')
global running
running = False
if __name__ == '__main__':
try:
consume_loop_sync(consumer=consumer, topics=["topic"])
except KeyboardInterrupt:
shutdown()
from threading import Thread
from confluent_kafka import KafkaException
from kafka_connectors import consumer
running = True
max_threads_in_parallel = 5
def do_something(msg, batch_number):
# exit if error
if msg.error():
raise KafkaException(msg.error())
# display received message
topic = msg.topic()
msg_key = msg.key().decode('utf-8')
msg_value = msg.value().decode('utf-8')
print(f'[Batch: {batch_number}] Received message: {msg_key}: {msg_value} from topic: {topic}')
def consume_loop_async(consumer, topics):
try:
consumer.subscribe(topics)
global running
batch_number = 1
while running:
# consume kafka topic
received_messages = consumer.poll(num_messages=max_threads_in_parallel, timeout=1.0)
open_threads = []
# for each message received
for msg in received_messages:
# create thread
thread = Thread(target=do_something, args=(msg, batch_number,))
# start thread in parallel
thread.start()
# append to open threads
open_threads.append(thread)
# wait for all threads in batch to finish
for thread in open_threads:
thread.join()
# finish batch execution
print("All theads completed")
batch_number += 1
finally:
# Close down consumer to commit final offsets.
consumer.close()
def shutdown():
print('shuting down consumer...')
global running
running = False
if __name__ == '__main__':
try:
consume_loop_async(consumer=consumer, topics=["topic"])
except KeyboardInterrupt:
shutdown()