Created
April 20, 2013 19:12
-
-
Save jgoldschrafe/5427034 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DATATYPE::HOSTPERFDATA TIMET::1365442741 HOSTNAME::hs6bms HOSTPERFDATA::rta=13.191ms;300.000;500.000;0; pl=0%;20;60;; HOSTCHECKCOMMAND::check_host_alive HOSTSTATE::UP HOSTSTATETYPE::HARD GRAPHITEPREFIX::nagios GRAPHITEPOSTFIX::host-ping | |
DATATYPE::HOSTPERFDATA TIMET::1365442741 HOSTNAME::risb116-2950 HOSTPERFDATA::rta=2.096ms;300.000;500.000;0; pl=0%;20;60;; HOSTCHECKCOMMAND::check_host_alive HOSTSTATE::UP HOSTSTATETYPE::HARD GRAPHITEPREFIX::nagios GRAPHITEPOSTFIX::host-ping | |
DATATYPE::HOSTPERFDATA TIMET::1365442741 HOSTNAME::roof-2960g HOSTPERFDATA::rta=0.679ms;300.000;500.000;0; pl=0%;20;60;; HOSTCHECKCOMMAND::check_host_alive HOSTSTATE::UP HOSTSTATETYPE::HARD GRAPHITEPREFIX::nagios GRAPHITEPOSTFIX::host-ping | |
DATATYPE::HOSTPERFDATA TIMET::1365442741 HOSTNAME::rose-2950-1 HOSTPERFDATA::rta=1.534ms;300.000;500.000;0; pl=0%;20;60;; HOSTCHECKCOMMAND::check_host_alive HOSTSTATE::UP HOSTSTATETYPE::HARD GRAPHITEPREFIX::nagios GRAPHITEPOSTFIX::host-ping | |
DATATYPE::HOSTPERFDATA TIMET::1365442741 HOSTNAME::sambrookmodbus HOSTPERFDATA::rta=1.914ms;300.000;500.000;0; pl=0%;20;60;; HOSTCHECKCOMMAND::check_host_alive HOSTSTATE::UP HOSTSTATETYPE::HARD GRAPHITEPREFIX::nagios GRAPHITEPOSTFIX::host-ping |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Metricinga - A gevent-based performance data forwarder for Nagios/Icinga | |
# | |
# Author: Jeff Goldschrafe <[email protected]> | |
import lockfile | |
import os | |
import cPickle as pickle | |
from pprint import pformat, pprint | |
import re | |
import signal | |
import socket | |
import struct | |
import sys | |
import time | |
from daemon import DaemonContext | |
import gevent | |
from gevent import Greenlet, Timeout | |
import gevent.monkey | |
from gevent.queue import PriorityQueue | |
import logging | |
import logging.handlers | |
from optparse import OptionGroup, OptionParser | |
try: | |
import gevent_inotifyx as inotify | |
use_inotify = True | |
except ImportError, ex: | |
use_inotify = False | |
gevent.monkey.patch_all() | |
# | |
# Utility classes | |
# | |
class Metric(object): | |
"""Represents a single datapoint of a system metric. | |
""" | |
def __init__(self, path=[], value=0, timestamp=0, source=None): | |
self.path = path | |
self.value = value | |
self.timestamp = timestamp | |
self.source = source | |
class PurgedFileFactory(object): | |
"""Manage state of PurgedFileToken instances. | |
Singleton-like factory to ensure file paths are not shared between | |
PurgedFileToken instances. | |
""" | |
instances = {} | |
@staticmethod | |
def create(path): | |
if PurgedFileFactory.instances.get(path): | |
return None | |
else: | |
PurgedFileFactory.instances[path] = True | |
return PurgedFileToken(path) | |
@staticmethod | |
def destroy(path): | |
if path in PurgedFileFactory.instances: | |
del PurgedFileFactory.instances[path] | |
class PurgedFileToken(object): | |
"""Deletes a file when the last reference to the token leaves scope. | |
""" | |
def __init__(self, path): | |
self.path = path | |
def __del__(self): | |
log.debug("Unlinking file `{0}'".format(self.path)) | |
os.remove(self.path) | |
PurgedFileFactory.destroy(self.path) | |
class SourcedString(object): | |
"""Pairs a string with the PurgedFileToken it originated from. | |
Allows the original source to be purged when all references to its | |
data have been removed from scope. | |
""" | |
def __init__(self, string_, source): | |
self.string_ = string_ | |
self.source = source | |
# | |
# Message encapsulation classes | |
# | |
class ShutdownRequest(object): | |
pass | |
class ParseFileRequest(object): | |
def __init__(self, path): | |
self.path = path | |
class ParseLineRequest(object): | |
def __init__(self, line): | |
self.line = line | |
class PublishMetricRequest(object): | |
def __init__(self, metric): | |
self.metric = metric | |
# | |
# Decorators | |
# | |
class event(object): | |
def __init__(self, func): | |
self.__doc__ = func.__doc__ | |
self._key = ' ' + func.__name__ | |
def __get__(self, obj, cls): | |
try: | |
return obj.__dict__[self._key] | |
except KeyError, exc: | |
be = obj.__dict__[self._key] = boundevent() | |
return be | |
class boundevent(object): | |
def __init__(self): | |
self._fns = [] | |
def __call__(self, *args, **kwargs): | |
for f in self._fns: | |
f(*args, **kwargs) | |
def subscribe(self, fn): | |
self._fns.append(fn) | |
def unsubscribe(self, fn): | |
self._fns.remove(fn) | |
# | |
# Greenlet classes | |
# | |
class Actor(Greenlet): | |
"""Simple implementation of the Actor pattern. | |
""" | |
def __init__(self): | |
self.inbox = PriorityQueue() | |
self._handlers = {ShutdownRequest: self.receive_shutdown} | |
Greenlet.__init__(self) | |
def receive(self, msg): | |
"""Dispatch a received message to the appropriate type handler. | |
""" | |
#log.debug("Received a message: " + repr(msg)) | |
cls = msg.__class__ | |
if cls in self._handlers.keys(): | |
self._handlers[cls](msg) | |
else: | |
raise NotImplemented() | |
def receive_shutdown(self, msg): | |
self.running = False | |
def send(self, msg, priority=50): | |
"""Place a message into the actor's inbox. | |
""" | |
self.inbox.put((priority, msg)) | |
def _run(self): | |
self.running = True | |
while self.running: | |
prio, msg = self.inbox.get() | |
self.receive(msg) | |
del msg | |
class CarbonWriter(Actor): | |
"""Dispatch PublishMetricRequest messages to Carbon. | |
""" | |
def __init__(self, host, port): | |
self.host = host | |
self.port = port | |
self.backoff_secs = 0 | |
self.max_backoff_secs = 32 | |
self.sleep_secs = 0 | |
Actor.__init__(self) | |
self._handlers[PublishMetricRequest] = self.receive_publish | |
self._sock = socket.socket() | |
self._connect() | |
def receive_publish(self, msg): | |
metric = msg.metric | |
(path, timestamp, value) = (metric.path, metric.timestamp, | |
metric.value) | |
name = '.'.join([self._sanitize_metric_name(x) for x in path]) | |
try: | |
log.debug("Sending metric to Carbon: %s %s %s" % | |
(name, timestamp, value)) | |
pickle_list = [(name, (timestamp, value))] | |
payload = pickle.dumps(pickle_list) | |
header = struct.pack("!L", len(payload)) | |
message = header + payload | |
self._sock.sendall(message) | |
gevent.sleep(self.sleep_secs) | |
except socket.error, ex: | |
# Attempt to reconnect, then re-queue the unsent metric | |
log.warn("Couldn't send to %s:%s: %s" % | |
(self.host, self.port, ex)) | |
self._connect() | |
self.send(PublishMetricRequest(metric), priority=49) | |
def _connect(self): | |
"""Connect to the Carbon server. | |
Attempt to connect to the Carbon server. If the connection | |
attempt fails, increase the backoff time and sleep the writer | |
greenlet until the backoff time has elapsed. | |
""" | |
gevent.sleep(self.backoff_secs) | |
try: | |
log.info("Connecting to Carbon instance at %s:%s" % | |
(self.host, self.port)) | |
self._sock.connect((self.host, self.port)) | |
log.info("Connected to Carbon successfully") | |
self._reset_backoff() | |
except socket.error, ex: | |
log.warn("Failed to connect to %s:%s" % | |
(self.host, self.port)) | |
self._increase_backoff() | |
log.warn("Reconnecting in %s seconds" % self.backoff_secs) | |
def _increase_backoff(self): | |
if self.backoff_secs == 0: | |
self.backoff_secs = 1 | |
elif self.backoff_secs < self.max_backoff_secs: | |
self.backoff_secs *= 2 | |
def _reset_backoff(self): | |
self.backoff_secs = 0 | |
def _sanitize_metric_name(self, s): | |
return re.sub("[^\w-]", metric_replacement_char, s) | |
class FileProcessor(Actor): | |
"""Parse files and dispatch events when lines found. | |
""" | |
def __init__(self): | |
Actor.__init__(self) | |
self._handlers[ParseFileRequest] = self.receive_parse | |
@event | |
def on_line_found(self, line): | |
"""Called when a line is parsed from the file. | |
""" | |
def receive_parse(self, message): | |
"""Handle received ParseFileRequest messages. | |
""" | |
path = message.path | |
log.debug("Received file parse request: " + path) | |
source = PurgedFileFactory.create(path) | |
if source: | |
log.debug("Accepted file parse request: " + path) | |
with open(path, "r") as fp: | |
for line in fp: | |
sstr = SourcedString(line.rstrip(os.linesep), | |
source) | |
self.on_line_found(sstr) | |
gevent.sleep(0) | |
else: | |
log.debug("Received request to parse {0}, but file is already known".format(path)) | |
class LineProcessor(Actor): | |
"""Process lines of check results. | |
""" | |
def __init__(self): | |
Actor.__init__(self) | |
self._handlers[ParseLineRequest] = self.receive_line | |
self.tokenizer_re = \ | |
r"([^\s]+|'[^']+')=([-.\d]+)(c|s|us|ms|B|KB|MB|GB|TB|%)?(?:;([-.\d]+))?(?:;([-.\d]+))?(?:;([-.\d]+))?(?:;([-.\d]+))?" | |
@event | |
def on_metric_found(self, metric): | |
"""Called when a metric is extracted by the line processor. | |
""" | |
@event | |
def on_parse_failed(self, line): | |
"""Called when the line processor fails to parse a line. | |
""" | |
def receive_line(self, message): | |
line = message.line.string_ | |
source = message.line.source | |
fields = self._extract_fields(line) | |
if not self._fields_valid(fields): | |
return self.on_parse_failed(line) | |
for metric in self._make_metrics(fields, source): | |
self.on_metric_found(metric) | |
gevent.sleep(0) | |
def _extract_fields(self, line): | |
"""Parse KEY::VALUE pairs from a line of performance data. | |
""" | |
acc = {} | |
field_tokens = line.split("\t") | |
for field_token in field_tokens: | |
kv_tokens = field_token.split('::') | |
if len(kv_tokens) == 2: | |
(key, value) = kv_tokens | |
acc[key] = value | |
return acc | |
def _fields_valid(self, d): | |
"""Verify that all necessary fields are present. | |
""" | |
generic_fields = ['DATATYPE', 'HOSTNAME', 'TIMET'] | |
host_fields = ['HOSTPERFDATA'] | |
service_fields = ['SERVICEDESC', 'SERVICEPERFDATA'] | |
if 'DATATYPE' not in d: | |
return False | |
datatype = d['DATATYPE'] | |
if datatype == 'HOSTPERFDATA': | |
fields = generic_fields + host_fields | |
elif datatype == 'SERVICEPERFDATA': | |
fields = generic_fields + service_fields | |
else: | |
return False | |
for field in fields: | |
if field not in d: | |
return False | |
return True | |
def _make_metrics(self, fields, source): | |
metric_path_base = [] | |
graphite_prefix = fields.get('GRAPHITEPREFIX') | |
graphite_postfix = fields.get('GRAPHITEPOSTFIX') | |
if metric_prefix: | |
metric_path_base.append(metric_prefix) | |
hostname = fields['HOSTNAME'].lower() | |
metric_path_base.append(hostname) | |
datatype = fields['DATATYPE'] | |
if datatype == 'HOSTPERFDATA': | |
metric_path_base.append('host') | |
elif datatype == 'SERVICEPERFDATA': | |
service_desc = fields.get('SERVICEDESC') | |
graphite_postfix = fields.get('GRAPHITEPOSTFIX') | |
if graphite_postfix is not None: | |
metric_path_base.append(graphite_postfix) | |
else: | |
metric_path_base.append(service_desc) | |
timestamp = fields['TIMET'] | |
perfdata = fields[datatype] | |
counters = self._parse_perfdata(perfdata) | |
for (counter, value) in counters: | |
metric_path = metric_path_base + [counter] | |
yield Metric(metric_path, timestamp, value, source) | |
def _parse_perfdata(self, s): | |
"""Parse performance data from a *PERFDATA string. | |
""" | |
metrics = [] | |
counters = re.findall(self.tokenizer_re, s) | |
if counters is None: | |
log.warning("Failed to parse performance data: %s" % (s,)) | |
return metrics | |
for (key, value, uom, warn, crit, min, max) in counters: | |
try: | |
metrics.append((key, float(value))) | |
except ValueError, ex: | |
log.warning("Couldn't convert value '%s' to float" % (value,)) | |
return metrics | |
class SpoolRunner(Greenlet): | |
def __init__(self, perfdata_dir, poll_interval=None): | |
self.perfdata_dir = perfdata_dir | |
self.poll_interval = poll_interval | |
Greenlet.__init__(self) | |
@event | |
def on_find(self): | |
"""Called when a file is found by the spool runner. | |
""" | |
def _run(self): | |
while True: | |
for filename in os.listdir(self.perfdata_dir): | |
self.on_find('/'.join([self.perfdata_dir, filename])) | |
if self.poll_interval is not None: | |
gevent.sleep(self.poll_interval) | |
else: | |
break | |
def parse_options(): | |
parser = OptionParser() | |
parser.add_option('-d', '--debug', | |
help='Do not daemonize; enable debug-level logging', | |
dest='debug', action='store_true') | |
parser.add_option('-D', '--spool-dir', | |
help='Path to performance data spool dir', | |
dest='spool_dir') | |
parser.add_option('-H', '--host', | |
help='Host to submit metrics to', | |
dest='host') | |
parser.add_option('-p', '--port', | |
help='Port to submit metrics to', | |
dest='port') | |
parser.add_option('-P', '--prefix', | |
help='Prefix to prepend to all metric names', | |
dest='prefix') | |
parser.add_option('-r', '--replacement-char', | |
help='Replacement character for illegal metric characters (e.g. ".")', | |
dest='replacement_char') | |
(opts, args) = parser.parse_args() | |
return (opts, args) | |
# | |
# Parse options | |
# | |
host = None | |
port = 2004 | |
metric_prefix = None | |
metric_replacement_char = '_' | |
perfdata_spool_dir = '/var/spool/metricinga' | |
pidfile = '/var/run/metricinga.pid' | |
daemonize = True | |
log_level = logging.INFO | |
(opts, args) = parse_options() | |
if opts.host is not None: | |
host = opts.host | |
if opts.port is not None: | |
port = opts.port | |
if opts.prefix is not None: | |
metric_prefix = opts.prefix | |
if opts.replacement_char is not None: | |
metric_replacement_char = opts.replacement_char | |
if opts.spool_dir is not None: | |
perfdata_spool_dir = opts.spool_dir | |
if opts.debug is True: | |
log_level = logging.DEBUG | |
daemonize = False | |
if host is None: | |
print("Fatal: No Graphite host specified!") | |
sys.exit(1) | |
if daemonize: | |
log_handler = logging.handlers.SysLogHandler('/dev/log') | |
formatter = logging.Formatter( | |
"%(filename)s: %(levelname)s %(message)s") | |
else: | |
log_handler = logging.StreamHandler() | |
formatter = logging.Formatter( | |
"%(asctime)s %(filename)s: %(levelname)s %(message)s", | |
"%Y/%m/%d %H:%M:%S") | |
log_handler.setFormatter(formatter) | |
log = logging.getLogger('log') | |
log.addHandler(log_handler) | |
log.setLevel(log_level) | |
log.info("Starting up...") | |
# Init workers and run | |
def run(): | |
# FIXME: Don't need int() once migrated from optparse to argparse | |
cw = CarbonWriter(opts.host, int(opts.port)) | |
lp = LineProcessor() | |
lp.on_metric_found.subscribe(lambda metric: cw.send(PublishMetricRequest(metric))) | |
fp = FileProcessor() | |
fp.on_line_found.subscribe(lambda line: lp.send(ParseLineRequest(line))) | |
sp = SpoolRunner(perfdata_spool_dir, 60) | |
sp.on_find.subscribe(lambda path: fp.send(ParseFileRequest(path))) | |
actors = [cw, lp, fp] | |
tasklets = [sp] | |
workers = actors + tasklets | |
def shutdown(actors, tasklets): | |
log.info("Received shutdown signal") | |
for actor in actors: | |
actor.send(ShutdownRequest(), priority=0) | |
for tasklet in tasklets: | |
tasklet.kill() | |
gevent.signal(signal.SIGINT, shutdown, actors, tasklets) | |
gevent.signal(signal.SIGTERM, shutdown, actors, tasklets) | |
for worker in workers: | |
worker.start() | |
gevent.joinall(workers) | |
if daemonize: | |
context = DaemonContext() | |
context.stderr = sys.stderr | |
context.stdout = sys.stdout | |
#context.signal_map = {signal.SIGTERM: shutdown, signal.SIGINT: shutdown} | |
with context: | |
run() | |
else: | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment