Skip to content

Instantly share code, notes, and snippets.

@DonerKebab
Last active May 10, 2018 04:14
Show Gist options
  • Save DonerKebab/d4141fadd24a9f1973a7d9d1b67bd6d6 to your computer and use it in GitHub Desktop.
Save DonerKebab/d4141fadd24a9f1973a7d9d1b67bd6d6 to your computer and use it in GitHub Desktop.
python kafka example
from kafka import KafkaConsumer
import time
import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_event = multiprocessing.Event()
def stop(self):
self.stop_event.set()
def run(self):
#To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('marketdata',
group_id='mirrormaker202',
bootstrap_servers=['10.26.53.17:9092'],
consumer_timeout_ms=1000)
while not self.stop_event.is_set():
for message in consumer:
# handle new message here:
print(message)
if self.stop_event.is_set():
break
consumer.close()
consumer = Consumer()
# start
consumer.start()
#run 10 sec
time.sleep(10)
# then stop
consumer.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment