Created
February 25, 2023 05:30
-
-
Save cloudlakecho/c15ba5a614bcfedced0b0214f3b74438 to your computer and use it in GitHub Desktop.
Consumer using Kafka and ZooKeeper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# test_kafka_consumer.py - Receiving data and save in database | |
# Steven Van Dorpe | |
# | |
# How to run this code: | |
# (1) Run Zookeeper | |
# $ cd /usr/local/bin/kafka_2.13-3.4.0/ | |
# $ bin/zookeeper-server-start.sh config/zookeeper.properties | |
# (2) Run Kafka server | |
# $ bin/kafka-server-start.sh config/server.properties | |
# (3) Run MongoDB | |
# $ sudo systemctl start mongod (not mongodb) | |
# (4) Start Topic | |
# Please, check the topic is mateched in producer and consumer | |
# $ bin/kafka-topics.sh --create --topic numtest --botstrap-server localhost:9092 | |
# (5) Run producer | |
# $ python test_kafka_producer.py | |
# Ref: https://towardsdatascience.com/kafka-python-explained-in-10-lines-of-code-800e3e07dad1 | |
from kafka import KafkaConsumer | |
from pymongo import MongoClient | |
from json import loads | |
consumer = KafkaConsumer( | |
'numtest', | |
bootstrap_servers=['localhost:9092'], | |
auto_offset_reset='earliest', | |
enable_auto_commit=True, | |
group_id='my-group', | |
value_deserializer=lambda x: loads(x.decode('utf-8'))) | |
client = MongoClient('localhost:27017') | |
collection = client.numtest.numtest | |
for message in consumer: | |
message = message.value | |
collection.insert_one(message) | |
print('{} added to {}'.format(message, collection)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment