Skip to content

Instantly share code, notes, and snippets.

@macobo
Last active September 9, 2015 14:54
Show Gist options
  • Select an option

  • Save macobo/103c3af0e18526ffb54a to your computer and use it in GitHub Desktop.

Select an option

Save macobo/103c3af0e18526ffb54a to your computer and use it in GitHub Desktop.
from kafka import KafkaClient
from kazoo.client import KazooClient
from kafka.common import OffsetRequest
from checks import AgentCheck
import json
def chunk(lists, n):
for i in range(0, len(lists), n):
yield tuple(lists[i:i+n])
class KafkaOffsetCheck(AgentCheck):
def get_kafka_bounds(self, client, topic, partitions):
requests = [OffsetRequest(topic, partition, time, 1) for time in [-2, -1] for partition in partitions]
results = client.send_offset_request(requests)
return chunk(map(lambda x: x.offsets[0], results), n=2)
def get_zookeeper_bounds(self, zk, partition, key_template="/queue_worker/offsets/{partition}"):
key = key_template.format(partition=partition)
data = json.loads(zk.get(key)[0])
return (int(data["lowerBound"]), int(data["upperBound"]))
def check_topic(self, client, zk, topic, instance):
partitions = client.get_partition_ids_for_topic(topic)
offsets = self.get_kafka_bounds(client, topic, partitions)
# Note: dd-agent has v0.9.0 of kafka library, this option is not documented anymore.
for partition, (kafka_earliest, kafka_latest) in zip(partitions, offsets):
tags = ['partition:'+str(partition)]
self.gauge("kafka.offsets.{topic}.min".format(topic=topic), kafka_earliest, tags=tags)
self.gauge("kafka.offsets.{topic}.max".format(topic=topic), kafka_latest, tags=tags)
if topic == instance.get("ingress_topic"):
(lowerBound, upperBound) = self.get_zookeeper_bounds(zk, partition)
self.gauge("queue_worker.offsets.lowerBound", lowerBound, tags=tags)
self.gauge("queue_worker.offsets.upperBound", upperBound, tags=tags)
self.gauge("queue_worker.progress.todo", (kafka_latest - lowerBound), tags=tags)
def check(self, instance):
kafkaUrl = instance['kafka_urls'][0]
zk_url = instance["zookeeper_connection_str"]
client = KafkaClient(kafkaUrl)
zkClient = KazooClient(zk_url)
try:
zkClient.start()
print 'topics', instance['topics']
for topic in instance['topics']:
self.check_topic(client, zkClient, topic, instance)
finally:
zkClient.stop()
zkClient.close()
client.close()
if __name__ == '__main__':
check, instances = KafkaOffsetCheck.from_yaml('/etc/dd-agent/conf.d/kafka_offsets.yaml')
for instance in instances:
check.check(instance)
print 'Metrics: %s' % (check.get_metrics())
init_config:
instances:
- kafka_urls:
- kafka-01:9092
- kafka-02:9092
- kafka-03:9092
topics:
- ingress
- ingress_malformed
zookeeper_connection_str: zookeeper-01:2181,zookeeper-02:2181,zookeeper-03:2181
ingress_topic: ingress
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment