Skip to content

Instantly share code, notes, and snippets.

@elubow
Created November 26, 2014 17:37
Show Gist options
  • Save elubow/08c5bf790b16a12a44e6 to your computer and use it in GitHub Desktop.
Save elubow/08c5bf790b16a12a44e6 to your computer and use it in GitHub Desktop.
Check Aggregate NSQ Depths in Nagios
#!/usr/bin/env python2.7
import sys
import logging
import functools
import time
import tornado.options
import tornado.ioloop
import nsq_data
OK = 0 # The plugin was able to check the service and it appeared to be functioning properly
WARNING = 1 # The plugin was able to check the service, but it appeared to be above some "warning" threshold or did not appear to be working properly
CRITICAL = 2 # The plugin detected that either the service was not running or it was above some "critical" threshold
UNKNOWN = 3 # Invalid command line arguments were supplied to the plugin or low-level failures internal to the plugin
state = UNKNOWN
message = "failed"
cluster_data = {}
num_outstanding_requests = 0
def get_topics(endpoints):
global num_outstanding_requests
num_outstanding_requests = len(endpoints)
for endpoint in endpoints:
callback = functools.partial(_finish_get_topics, endpoints=endpoints)
nsq_data.get_topics_in_lookupd(endpoint, callback)
def _finish_get_topics(lookup_data, endpoints):
global cluster_data
global num_outstanding_requests
num_outstanding_requests -= 1
lookup_data = lookup_data or {}
for topic in lookup_data.get('topics', []):
cluster_data[topic] = {}
if num_outstanding_requests == 0:
get_producers(endpoints)
def get_producers(endpoints):
global num_outstanding_requests
for topic in cluster_data:
for endpoint in endpoints:
num_outstanding_requests += 1
nsq_data.get_producers_for_topic(endpoint, topic, functools.partial(_finish_get_producers, topic=topic))
def _finish_get_producers(lookup_data, topic):
global cluster_data
global num_outstanding_requests
num_outstanding_requests -= 1
lookup_data = lookup_data or {}
for channel in lookup_data.get('channels', []):
if channel not in cluster_data[topic] and not channel.endswith('#ephemeral'):
cluster_data[topic][channel] = {}
# None means no producers
cluster_data[topic][channel]['depth'] = None
cluster_data[topic][channel]['producers'] = set()
for producer in lookup_data.get('producers', []):
producer_endpoint = 'http://%s:%d' % (producer['broadcast_address'], producer['http_port'])
for channel in cluster_data[topic]:
cluster_data[topic][channel]['producers'].add(producer_endpoint)
if num_outstanding_requests == 0:
get_depths()
def get_depths():
global num_outstanding_requests
endpoints = set()
for topic in cluster_data:
for channel in cluster_data[topic]:
# we have producers, set depth to 0
if cluster_data[topic][channel]['producers']:
cluster_data[topic][channel]['depth'] = 0
for endpoint in cluster_data[topic][channel]['producers']:
endpoints.add(endpoint)
if not endpoints:
finish()
for endpoint in endpoints:
num_outstanding_requests += 1
nsq_data.get_nsqd_stats(endpoint, functools.partial(_finish_get_depths, topic=topic, channel=channel))
def _finish_get_depths(nsqd_data, topic, channel):
global num_outstanding_requests
global cluster_data
num_outstanding_requests -= 1
nsqd_data = nsqd_data or {}
for topic_data in nsqd_data.get('topics', []):
topic = topic_data['topic_name']
for channel_data in topic_data.get('channels', []):
channel = channel_data['channel_name']
if channel in cluster_data.get(topic, {}):
cluster_data[topic][channel]['depth'] += channel_data.get('depth', 0) + channel_data.get('deferred_count', 0)
if num_outstanding_requests == 0:
finish()
def finish():
global state
global message
topic = tornado.options.options.topic
channel = tornado.options.options.channel
warning_threshold = tornado.options.options.warning
critical_threshold = tornado.options.options.critical
channel_data = cluster_data.get(topic, {}).get(channel, {})
depth = channel_data.get('depth')
if not channel_data or depth is None:
state = 2
message = 'no producers found in lookupd'
else:
state = 0
message = 'depth of %d' % depth
if depth > warning_threshold:
state = 1
message = 'depth of %d (warn at %d)' % (depth, warning_threshold)
if depth > critical_threshold:
state = 2
message = 'depth of %d (crit at %d)' % (depth, critical_threshold)
tornado.ioloop.IOLoop.instance().stop()
def main():
tornado.options.define("lookupd_http_address", default="http://127.0.0.1:4161", multiple=True,
help="http address of the lookupd instances in this cluster (multiple)")
tornado.options.define("warning", default=50000, help="warning threshold")
tornado.options.define("critical", default=100000, help="critical threshold")
tornado.options.define("topic", help="NSQ topic")
tornado.options.define("channel", help="NSQ channel")
tornado.options.parse_command_line()
logging.getLogger().setLevel(logging.CRITICAL)
get_topics(tornado.options.options.lookupd_http_address)
io_loop = tornado.ioloop.IOLoop.instance()
# add a deadline to prevent it from living forever in exceptional states
io_loop.add_timeout(time.time() + 9, io_loop.stop)
io_loop.start()
print message
return state
if __name__ == "__main__":
sys.exit(main())
import tornado.httpclient
import logging
import json
import urllib
import functools
_http_client = None
def http_client():
global _http_client
if not _http_client:
_http_client = tornado.httpclient.AsyncHTTPClient()
return _http_client
def _process_response(response, endpoint, user_callback):
if response.error:
logging.warning("[%s] http error %s", endpoint, response.error)
return user_callback(None)
try:
data = json.loads(response.body)
except json.JSONDecodeError:
logging.warning("[%s] failed to parse JSON: %r", endpoint, response.body)
return user_callback(None)
if data.get('status_code') != 200:
logging.warning("[%s] endpoint responded with %d", endpoint, data.get('status_code'))
return user_callback(None)
user_callback(data.get('data', {}))
def get_topics_in_lookupd(endpoint, user_callback):
lookupd_url = endpoint + "/topics"
req = tornado.httpclient.HTTPRequest(lookupd_url, method="GET",
connect_timeout=2, request_timeout=5)
callback = functools.partial(_process_response, endpoint=endpoint, user_callback=user_callback)
http_client().fetch(req, callback=callback)
def get_producers_for_topic(endpoint, topic, user_callback):
lookupd_url = endpoint + "/lookup?topic=" + urllib.quote(topic)
req = tornado.httpclient.HTTPRequest(lookupd_url, method="GET",
connect_timeout=2, request_timeout=5)
callback = functools.partial(_process_response, endpoint=endpoint, user_callback=user_callback)
http_client().fetch(req, callback=callback)
def get_nsqd_stats(endpoint, user_callback):
nsqd_url = endpoint + "/stats?format=json"
req = tornado.httpclient.HTTPRequest(nsqd_url, method="GET",
connect_timeout=2, request_timeout=5)
callback = functools.partial(_process_response, endpoint=endpoint, user_callback=user_callback)
http_client().fetch(req, callback=callback)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment