Skip to content

Instantly share code, notes, and snippets.

@cloudlakecho
Created February 25, 2023 05:30
Show Gist options
  • Save cloudlakecho/c15ba5a614bcfedced0b0214f3b74438 to your computer and use it in GitHub Desktop.
Save cloudlakecho/c15ba5a614bcfedced0b0214f3b74438 to your computer and use it in GitHub Desktop.
Consumer using Kafka and ZooKeeper
# test_kafka_consumer.py - Receiving data and save in database
# Steven Van Dorpe
#
# How to run this code:
# (1) Run Zookeeper
# $ cd /usr/local/bin/kafka_2.13-3.4.0/
# $ bin/zookeeper-server-start.sh config/zookeeper.properties
# (2) Run Kafka server
# $ bin/kafka-server-start.sh config/server.properties
# (3) Run MongoDB
# $ sudo systemctl start mongod (not mongodb)
# (4) Start Topic
# Please, check the topic is mateched in producer and consumer
# $ bin/kafka-topics.sh --create --topic numtest --botstrap-server localhost:9092
# (5) Run producer
# $ python test_kafka_producer.py
# Ref: https://towardsdatascience.com/kafka-python-explained-in-10-lines-of-code-800e3e07dad1
from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads
consumer = KafkaConsumer(
'numtest',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
client = MongoClient('localhost:27017')
collection = client.numtest.numtest
for message in consumer:
message = message.value
collection.insert_one(message)
print('{} added to {}'.format(message, collection))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment