Created
November 26, 2014 17:37
-
-
Save elubow/08c5bf790b16a12a44e6 to your computer and use it in GitHub Desktop.
Check Aggregate NSQ Depths in Nagios
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
import sys | |
import logging | |
import functools | |
import time | |
import tornado.options | |
import tornado.ioloop | |
import nsq_data | |
OK = 0 # The plugin was able to check the service and it appeared to be functioning properly | |
WARNING = 1 # The plugin was able to check the service, but it appeared to be above some "warning" threshold or did not appear to be working properly | |
CRITICAL = 2 # The plugin detected that either the service was not running or it was above some "critical" threshold | |
UNKNOWN = 3 # Invalid command line arguments were supplied to the plugin or low-level failures internal to the plugin | |
state = UNKNOWN | |
message = "failed" | |
cluster_data = {} | |
num_outstanding_requests = 0 | |
def get_topics(endpoints): | |
global num_outstanding_requests | |
num_outstanding_requests = len(endpoints) | |
for endpoint in endpoints: | |
callback = functools.partial(_finish_get_topics, endpoints=endpoints) | |
nsq_data.get_topics_in_lookupd(endpoint, callback) | |
def _finish_get_topics(lookup_data, endpoints): | |
global cluster_data | |
global num_outstanding_requests | |
num_outstanding_requests -= 1 | |
lookup_data = lookup_data or {} | |
for topic in lookup_data.get('topics', []): | |
cluster_data[topic] = {} | |
if num_outstanding_requests == 0: | |
get_producers(endpoints) | |
def get_producers(endpoints): | |
global num_outstanding_requests | |
for topic in cluster_data: | |
for endpoint in endpoints: | |
num_outstanding_requests += 1 | |
nsq_data.get_producers_for_topic(endpoint, topic, functools.partial(_finish_get_producers, topic=topic)) | |
def _finish_get_producers(lookup_data, topic): | |
global cluster_data | |
global num_outstanding_requests | |
num_outstanding_requests -= 1 | |
lookup_data = lookup_data or {} | |
for channel in lookup_data.get('channels', []): | |
if channel not in cluster_data[topic] and not channel.endswith('#ephemeral'): | |
cluster_data[topic][channel] = {} | |
# None means no producers | |
cluster_data[topic][channel]['depth'] = None | |
cluster_data[topic][channel]['producers'] = set() | |
for producer in lookup_data.get('producers', []): | |
producer_endpoint = 'http://%s:%d' % (producer['broadcast_address'], producer['http_port']) | |
for channel in cluster_data[topic]: | |
cluster_data[topic][channel]['producers'].add(producer_endpoint) | |
if num_outstanding_requests == 0: | |
get_depths() | |
def get_depths(): | |
global num_outstanding_requests | |
endpoints = set() | |
for topic in cluster_data: | |
for channel in cluster_data[topic]: | |
# we have producers, set depth to 0 | |
if cluster_data[topic][channel]['producers']: | |
cluster_data[topic][channel]['depth'] = 0 | |
for endpoint in cluster_data[topic][channel]['producers']: | |
endpoints.add(endpoint) | |
if not endpoints: | |
finish() | |
for endpoint in endpoints: | |
num_outstanding_requests += 1 | |
nsq_data.get_nsqd_stats(endpoint, functools.partial(_finish_get_depths, topic=topic, channel=channel)) | |
def _finish_get_depths(nsqd_data, topic, channel): | |
global num_outstanding_requests | |
global cluster_data | |
num_outstanding_requests -= 1 | |
nsqd_data = nsqd_data or {} | |
for topic_data in nsqd_data.get('topics', []): | |
topic = topic_data['topic_name'] | |
for channel_data in topic_data.get('channels', []): | |
channel = channel_data['channel_name'] | |
if channel in cluster_data.get(topic, {}): | |
cluster_data[topic][channel]['depth'] += channel_data.get('depth', 0) + channel_data.get('deferred_count', 0) | |
if num_outstanding_requests == 0: | |
finish() | |
def finish(): | |
global state | |
global message | |
topic = tornado.options.options.topic | |
channel = tornado.options.options.channel | |
warning_threshold = tornado.options.options.warning | |
critical_threshold = tornado.options.options.critical | |
channel_data = cluster_data.get(topic, {}).get(channel, {}) | |
depth = channel_data.get('depth') | |
if not channel_data or depth is None: | |
state = 2 | |
message = 'no producers found in lookupd' | |
else: | |
state = 0 | |
message = 'depth of %d' % depth | |
if depth > warning_threshold: | |
state = 1 | |
message = 'depth of %d (warn at %d)' % (depth, warning_threshold) | |
if depth > critical_threshold: | |
state = 2 | |
message = 'depth of %d (crit at %d)' % (depth, critical_threshold) | |
tornado.ioloop.IOLoop.instance().stop() | |
def main(): | |
tornado.options.define("lookupd_http_address", default="http://127.0.0.1:4161", multiple=True, | |
help="http address of the lookupd instances in this cluster (multiple)") | |
tornado.options.define("warning", default=50000, help="warning threshold") | |
tornado.options.define("critical", default=100000, help="critical threshold") | |
tornado.options.define("topic", help="NSQ topic") | |
tornado.options.define("channel", help="NSQ channel") | |
tornado.options.parse_command_line() | |
logging.getLogger().setLevel(logging.CRITICAL) | |
get_topics(tornado.options.options.lookupd_http_address) | |
io_loop = tornado.ioloop.IOLoop.instance() | |
# add a deadline to prevent it from living forever in exceptional states | |
io_loop.add_timeout(time.time() + 9, io_loop.stop) | |
io_loop.start() | |
print message | |
return state | |
if __name__ == "__main__": | |
sys.exit(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tornado.httpclient | |
import logging | |
import json | |
import urllib | |
import functools | |
_http_client = None | |
def http_client(): | |
global _http_client | |
if not _http_client: | |
_http_client = tornado.httpclient.AsyncHTTPClient() | |
return _http_client | |
def _process_response(response, endpoint, user_callback): | |
if response.error: | |
logging.warning("[%s] http error %s", endpoint, response.error) | |
return user_callback(None) | |
try: | |
data = json.loads(response.body) | |
except json.JSONDecodeError: | |
logging.warning("[%s] failed to parse JSON: %r", endpoint, response.body) | |
return user_callback(None) | |
if data.get('status_code') != 200: | |
logging.warning("[%s] endpoint responded with %d", endpoint, data.get('status_code')) | |
return user_callback(None) | |
user_callback(data.get('data', {})) | |
def get_topics_in_lookupd(endpoint, user_callback): | |
lookupd_url = endpoint + "/topics" | |
req = tornado.httpclient.HTTPRequest(lookupd_url, method="GET", | |
connect_timeout=2, request_timeout=5) | |
callback = functools.partial(_process_response, endpoint=endpoint, user_callback=user_callback) | |
http_client().fetch(req, callback=callback) | |
def get_producers_for_topic(endpoint, topic, user_callback): | |
lookupd_url = endpoint + "/lookup?topic=" + urllib.quote(topic) | |
req = tornado.httpclient.HTTPRequest(lookupd_url, method="GET", | |
connect_timeout=2, request_timeout=5) | |
callback = functools.partial(_process_response, endpoint=endpoint, user_callback=user_callback) | |
http_client().fetch(req, callback=callback) | |
def get_nsqd_stats(endpoint, user_callback): | |
nsqd_url = endpoint + "/stats?format=json" | |
req = tornado.httpclient.HTTPRequest(nsqd_url, method="GET", | |
connect_timeout=2, request_timeout=5) | |
callback = functools.partial(_process_response, endpoint=endpoint, user_callback=user_callback) | |
http_client().fetch(req, callback=callback) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment