Skip to content

Instantly share code, notes, and snippets.

@heaven00
Created May 25, 2016 11:01
Show Gist options
  • Save heaven00/18768e4ffbab85c612d0991a3e3f711e to your computer and use it in GitHub Desktop.
Save heaven00/18768e4ffbab85c612d0991a3e3f711e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import threading, logging, time
from kafka import KafkaConsumer, KafkaProducer
class Producer(threading.Thread):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='localhost:9092')
while True:
producer.send('my-topic', b"test")
producer.send('my-topic', b"\xc2Hola, mundo!")
time.sleep(1)
class Consumer(threading.Thread):
daemon = True
def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
consumer.subscribe(['my-topic'])
for message in consumer:
print (message)
def main():
threads = [
Producer(),
Consumer()
]
for t in threads:
t.start()
time.sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment