Skip to content

Instantly share code, notes, and snippets.

@mreiferson
Created September 12, 2014 17:59
Show Gist options
  • Save mreiferson/2a74e969fe4aa857c1e1 to your computer and use it in GitHub Desktop.
Save mreiferson/2a74e969fe4aa857c1e1 to your computer and use it in GitHub Desktop.
nagios aggregate depth check
#!/usr/bin/env python2.7
import sys
import logging
import functools
import time
import tornado.options
import tornado.ioloop
import nsq_data
OK = 0 # The plugin was able to check the service and it appeared to be functioning properly
WARNING = 1 # The plugin was able to check the service, but it appeared to be above some "warning" threshold or did not appear to be working properly
CRITICAL = 2 # The plugin detected that either the service was not running or it was above some "critical" threshold
UNKNOWN = 3 # Invalid command line arguments were supplied to the plugin or low-level failures internal to the plugin
state = UNKNOWN
message = "failed"
cluster_data = {}
num_outstanding_requests = 0
def get_topics(endpoints):
global num_outstanding_requests
num_outstanding_requests = len(endpoints)
for endpoint in endpoints:
callback = functools.partial(_finish_get_topics, endpoints=endpoints)
nsq_data.get_topics_in_lookupd(endpoint, callback)
def _finish_get_topics(lookup_data, endpoints):
global cluster_data
global num_outstanding_requests
num_outstanding_requests -= 1
lookup_data = lookup_data or {}
for topic in lookup_data.get('topics', []):
cluster_data[topic] = {}
if num_outstanding_requests == 0:
get_producers(endpoints)
def get_producers(endpoints):
global num_outstanding_requests
for topic in cluster_data:
for endpoint in endpoints:
num_outstanding_requests += 1
nsq_data.get_producers_for_topic(endpoint, topic, functools.partial(_finish_get_producers, topic=topic))
def _finish_get_producers(lookup_data, topic):
global cluster_data
global num_outstanding_requests
num_outstanding_requests -= 1
lookup_data = lookup_data or {}
for channel in lookup_data.get('channels', []):
if channel not in cluster_data[topic] and not channel.endswith('#ephemeral'):
cluster_data[topic][channel] = {}
# None means no producers
cluster_data[topic][channel]['depth'] = None
cluster_data[topic][channel]['producers'] = set()
for producer in lookup_data.get('producers', []):
producer_endpoint = 'http://%s:%d' % (producer['broadcast_address'], producer['http_port'])
for channel in cluster_data[topic]:
cluster_data[topic][channel]['producers'].add(producer_endpoint)
if num_outstanding_requests == 0:
get_depths()
def get_depths():
global num_outstanding_requests
endpoints = set()
for topic in cluster_data:
for channel in cluster_data[topic]:
# we have producers, set depth to 0
if cluster_data[topic][channel]['producers']:
cluster_data[topic][channel]['depth'] = 0
for endpoint in cluster_data[topic][channel]['producers']:
endpoints.add(endpoint)
if not endpoints:
finish()
for endpoint in endpoints:
num_outstanding_requests += 1
nsq_data.get_nsqd_stats(endpoint, functools.partial(_finish_get_depths, topic=topic, channel=channel))
def _finish_get_depths(nsqd_data, topic, channel):
global num_outstanding_requests
global cluster_data
num_outstanding_requests -= 1
nsqd_data = nsqd_data or {}
for topic_data in nsqd_data.get('topics', []):
topic = topic_data['topic_name']
for channel_data in topic_data.get('channels', []):
channel = channel_data['channel_name']
if channel in cluster_data.get(topic, {}):
cluster_data[topic][channel]['depth'] += channel_data.get('depth', 0) + channel_data.get('deferred_count', 0)
if num_outstanding_requests == 0:
finish()
def finish():
global state
global message
topic = tornado.options.options.topic
channel = tornado.options.options.channel
warning_threshold = tornado.options.options.warning
critical_threshold = tornado.options.options.critical
channel_data = cluster_data.get(topic, {}).get(channel, {})
depth = channel_data.get('depth')
if not channel_data or depth is None:
state = 2
message = 'no producers found in lookupd'
else:
state = 0
message = 'depth of %d' % depth
if depth > warning_threshold:
state = 1
message = 'depth of %d (warn at %d)' % (depth, warning_threshold)
if depth > critical_threshold:
state = 2
message = 'depth of %d (crit at %d)' % (depth, critical_threshold)
tornado.ioloop.IOLoop.instance().stop()
def main():
tornado.options.define("lookupd_http_address", default="http://127.0.0.1:4161", multiple=True,
help="http address of the lookupd instances in this cluster (multiple)")
tornado.options.define("warning", default=50000, help="warning threshold")
tornado.options.define("critical", default=100000, help="critical threshold")
tornado.options.define("topic", help="NSQ topic")
tornado.options.define("channel", help="NSQ channel")
tornado.options.parse_command_line()
logging.getLogger().setLevel(logging.CRITICAL)
get_topics(tornado.options.options.lookupd_http_address)
io_loop = tornado.ioloop.IOLoop.instance()
# add a deadline to prevent it from living forever in exceptional states
io_loop.add_timeout(time.time() + 9, io_loop.stop)
io_loop.start()
print message
return state
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment