Skip to content

Instantly share code, notes, and snippets.

@emmettbutler
Created September 4, 2015 17:39
Show Gist options
  • Save emmettbutler/ad8c8a1ab66325887f5b to your computer and use it in GitHub Desktop.
Save emmettbutler/ad8c8a1ab66325887f5b to your computer and use it in GitHub Desktop.
import logging as log
import time
import threading
from pykafka import KafkaClient
from pykafka.common import OffsetType
log.basicConfig(level=log.ERROR)
client = KafkaClient(hosts="127.0.0.1:9092")
def run_consumer():
topic = client.topics['testtopic_replicated2']
consumer = topic.get_simple_consumer("test")
print "threads in start: {}".format(threading.active_count())
consumer.stop()
time.sleep(2)
"""
while True:
try:
message = consumer.consume()
if message:
print message.value
except:
break
"""
def do_nothing():
time.sleep(2)
def run_consumers(count):
threads = []
for i in xrange(count):
t = threading.Thread(target=run_consumer)
t.daemon = True
t.start()
threads.append(t)
print "threads: {}".format(threading.active_count())
for t in threads:
t.join()
if __name__ == "__main__":
print "threads in main: {}".format(threading.active_count())
run_consumers(20)
print "threads in main: {}".format(threading.active_count())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment