Skip to content

Instantly share code, notes, and snippets.

@phobos182
Created October 18, 2012 00:04
Show Gist options
  • Select an option

  • Save phobos182/3909108 to your computer and use it in GitHub Desktop.

Select an option

Save phobos182/3909108 to your computer and use it in GitHub Desktop.
Kafka Pickle Consumer
#!/usr/bin/env python
from optparse import OptionParser
from brod.zk import ZKConsumer
import time
import socket
import struct
import sys
import pickle
def _send(socket, data, retry=3):
"""Send data to graphite."""
# Attempt to send any data in the queue
while retry > 0:
# Check socket
if not socket:
# Attempt to restablish connection
retry -= 1
# Try again
continue
try:
# Send data to socket
socket.sendall(data)
break
except socket.error, e:
# Decrement retry
retry -= 1
continue
def _pickle(batch):
"""Pickle metrics into graphite format."""
payload = pickle.dumps(batch)
header = struct.pack("!L", len(payload))
message = header + payload
return message
def _convert(msg):
"""Convert a graphite key value string to pickle."""
path, timestamp, value = msg.split(' ')
m = (path, (timestamp, value))
return m
def graphite():
batch = []
parser = OptionParser()
parser.add_option("-z", "--zk", dest="zookeeper", default="localhost:2181", help="Kafka ZooKeeper quorum")
parser.add_option("-t", "--topic", dest="topic", help="Kafka topic")
parser.add_option("-g", "--group", dest="group", default="graphite", help="Kafka consumer group")
parser.add_option("-b", "--batch", dest="batch_size", type=int, default=10, help="Graphite pickle batch size")
(options, args) = parser.parse_args()
group = options.group
topic = options.topic
zookeeper = options.zookeeper
batch_size = options.batch_size
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.settimeout(5)
conn.connect(('carbon-relay1.klout', 2014))
consumer = ZKConsumer(zookeeper, group, topic, autocommit=True)
for msg_set in consumer.poll(poll_interval=15):
for offset, msg in msg_set:
print "Recv: %s" % (msg)
batch.append(_convert(msg))
if len(batch) >= batch_size:
pickle = _pickle(batch)
print batch
_send(conn, pickle)
batch = []
if __name__ == "__main__":
graphite()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment