Skip to content

Instantly share code, notes, and snippets.

@emmettbutler
Created December 18, 2015 20:42
Show Gist options
  • Save emmettbutler/2668d2c4a919ed89c8d2 to your computer and use it in GitHub Desktop.
Save emmettbutler/2668d2c4a919ed89c8d2 to your computer and use it in GitHub Desktop.
Test script for pykafka issue #389
import logging as log
import sys
import time
from pykafka import KafkaClient
log.basicConfig(level=log.DEBUG)
class ConsumerRunner():
def __init__(self):
self.consumer_group = "testgroup"
self.kafka_client = None
self.consumer = None
self.kafka_hosts = "localhost:9092"
self.zoo_hosts = "localhost:2181"
self.topic_name = "testtopic_replicated"
self.received_messages = set()
self.connect(5)
self.run()
def run(self):
last_ts = 0
reconnected = 0
while True:
msg = self.consumer.consume()
assert msg.value not in self.received_messages
self.received_messages.add(msg.value)
if (time.time() - last_ts) >= 10: # every second, for the example
self.consumer.commit_offsets()
self.consumer.stop() # ?
reconnected += 1
log.warning("Recreating the consumer for the %d st/nd/th time",
reconnected)
self.connect(self.connect_retries)
last_ts = time.time()
def connect(self, connect_retries):
self.connect_retries = connect_retries
ntries = 0
self.kafka_client = None
log.info("Connecting to kafka...")
while (self.kafka_client is None):
try:
self.kafka_client = KafkaClient(hosts=self.kafka_hosts)
except Exception, e:
t, v, tb = sys.exc_info()
ntries += 1
if (ntries > self.connect_retries) and (self.connect_retries != -1):
raise t, v, tb
log.exception("Could not connect to kafka: %s. Retrying in 1 second. Try %d/%d.", str(e), ntries, self.connect_retries)
time.sleep(1)
log.info("Looking for a topic %s...", self.topic_name)
while self.topic_name not in self.kafka_client.topics:
log.error("Could not find topic at the kafka brokers. Updating info and retrying in 1 second")
time.sleep(1)
self.kafka_client.update_cluster()
self.topic = self.kafka_client.topics[self.topic_name]
log.info("Creating a balanced consumer...")
self.consumer = self.topic.get_balanced_consumer(
consumer_group=self.consumer_group,
zookeeper_connect=self.zoo_hosts,
# consumer_timeout_ms=100,
auto_commit_enable=True)
if __name__ == "__main__":
a = ConsumerRunner()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment