Last active
August 29, 2015 14:02
-
-
Save tatiana/eaceb9975b3748e1c2a6 to your computer and use it in GitHub Desktop.
Example of how to process kafka messages which are in Avro format
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example of how to consume messages from Kafka, through Zookeeper. | |
Compatible with: | |
- Python 2.7.x | |
- Kafka 0.7.1 | |
- Zookeeper 3.4.5 | |
Requires the following Python packages, available at pypi.python.org: | |
fastavro==0.7.8 | |
kazoo==1.3.1 | |
samsa==0.3.11 | |
""" | |
import sys | |
from cStringIO import StringIO | |
import fastavro as avro | |
from kazoo.client import KazooClient | |
from samsa.cluster import Cluster | |
DOMAINS = [ | |
"zookeeper.kafka.server1.com", | |
"zookeeper.kafka.server2.com" | |
] | |
PORT = 2181 | |
TOPIC = 'some-topic' # pre-defined | |
GROUP = 'my-app' # custom according to consumer | |
zookeeper_hosts = ",".join(["{0}:{1}".format(domain, PORT) for domain in DOMAINS]) | |
zookeeper = KazooClient(hosts=zookeeper_hosts) | |
zookeeper.start() | |
cluster = Cluster(zookeeper) | |
topic = cluster.topics.get(TOPIC) | |
consumer = topic.subscribe(GROUP) | |
for message in consumer: | |
fp = StringIO(message) | |
data = avro.reader(fp) | |
for item in data: | |
print(item) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment