Skip to content

Instantly share code, notes, and snippets.

@rhwood
Created August 4, 2021 19:42
Show Gist options
  • Save rhwood/f98c78770212bbb85731018a7e77daa8 to your computer and use it in GitHub Desktop.
Save rhwood/f98c78770212bbb85731018a7e77daa8 to your computer and use it in GitHub Desktop.
Simple python script to send a collectd notification from the command line.
#!/usr/bin/python
#-*- coding: ISO-8859-1 -*-
# collectd-notify: send notifications to collectd from the command line
#
# Requires collectd to be configured with the unixsock plugin, like so:
#
# LoadPlugin unixsock
# <Plugin unixsock>
# SocketFile "/var/run/collectd-unixsock"
# SocketPerms "0775"
# </Plugin>
#
# Based on collect.py: the python collectd-unixsock module by Clay Loveless.
#
# Copyright (C) 2010 Randall Wood <[email protected]>
# Copyright (C) 2008 Clay Loveless <[email protected]>
#
# This software is provided 'as-is', without any express or implied
# warranty. In no event will the author be held liable for any damages
# arising from the use of this software.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
# appreciated but is not required.
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
import socket
import sys
import optparse
import time
class Collectd():
def __init__(self, path='/var/run/collectd-unixsock', noisy=False):
self.noisy = noisy
self.path = path
self._sock = self._connect()
def flush(self, timeout=None, plugins=[], identifiers=[]):
"""Send a FLUSH command.
Full documentation:
http://collectd.org/wiki/index.php/Plain_text_protocol#FLUSH
"""
# have to pass at least one plugin or identifier
if not plugins and not identifiers:
return None
args = []
if timeout:
args.append("timeout=%s" % timeout)
if plugins:
plugin_args = map(lambda x: "plugin=%s" % x, plugins)
args.extend(plugin_args)
if identifiers:
identifier_args = map(lambda x: "identifier=%s" % x, identifiers)
args.extend(identifier_args)
return self._cmd('FLUSH %s' % ' '.join(args))
def getthreshold(self, identifier):
"""Send a GETTHRESHOLD command.
Full documentation:
http://collectd.org/wiki/index.php/Plain_text_protocol#GETTHRESHOLD
"""
numvalues = self._cmd('GETTHRESHOLD "%s"' % identifier)
lines = []
if numvalues:
lines = self._readlines(numvalues)
return lines
def getval(self, identifier, flush_after=True):
"""Send a GETVAL command.
Also flushes the identifier if flush_after is True.
Full documentation:
http://collectd.org/wiki/index.php/Plain_text_protocol#GETVAL
"""
numvalues = self._cmd('GETVAL "%s"' % identifier)
lines = []
if numvalues:
lines = self._readlines(numvalues)
if flush_after:
self.flush(identifiers=[identifier])
return lines
def listval(self):
"""Send a LISTVAL command.
Full documentation:
http://collectd.org/wiki/index.php/Plain_text_protocol#LISTVAL
"""
numvalues = self._cmd('LISTVAL')
lines = []
if numvalues:
lines = self._readlines(numvalues)
return lines
def putnotif(self, message, options={}):
"""Send a PUTNOTIF command.
Options must be passed as a Python dictionary. Example:
options={'severity': 'failure', 'host': 'example.com'}
Full documentation:
http://collectd.org/wiki/index.php/Plain_text_protocol#PUTNOTIF
"""
args = []
if options:
options_args = map(lambda x: "%s=%s" % (x, options[x]), options)
args.extend(options_args)
args.append('message="%s"' % message)
return self._cmd('PUTNOTIF %s' % ' '.join(args))
def putval(self, identifier, values, options={}):
"""Send a PUTVAL command.
Options must be passed as a Python dictionary. Example:
options={'interval': 10}
Full documentation:
http://collectd.org/wiki/index.php/Plain_text_protocol#PUTVAL
"""
args = []
args.append('"%s"' % identifier)
if options:
options_args = map(lambda x: "%s=%s" % (x, options[x]), options)
args.extend(options_args)
values = map(str, values)
args.append(':'.join(values))
return self._cmd('PUTVAL %s' % ' '.join(args))
def _cmd(self, c):
try:
return self._cmdattempt(c)
except socket.error, (errno, errstr):
sys.stderr.write("[error] Sending to socket failed: [%d] %s\n"
% (errno, errstr))
self._sock = self._connect()
return self._cmdattempt(c)
def _cmdattempt(self, c):
if self.noisy:
print "[send] %s" % c
if not self._sock:
sys.stderr.write("[error] Socket unavailable. Can not send.")
return False
self._sock.send(c + "\n")
status_message = self._readline()
if self.noisy:
print "[recive] %s" % status_message
if not status_message:
return None
code, message = status_message.split(' ', 1)
if int(code):
return int(code)
return False
def _connect(self):
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self.path)
if self.noisy:
print "[socket] connected to %s" % self.path
return sock
except socket.error, (errno, errstr):
sys.stderr.write("[error] Connecting to socket failed: [%d] %s"
% (errno, errstr))
return None
def _readline(self):
"""Read single line from socket"""
if not self._sock:
sys.stderr.write("[error] Socket unavailable. Can not read.")
return None
try:
data = ''
buf = []
recv = self._sock.recv
while data != "\n":
data = recv(1)
if not data:
break
if data != "\n":
buf.append(data)
return ''.join(buf)
except socket.error, (errno, errstr):
sys.stderr.write("[error] Reading from socket failed: [%d] %s"
% (errno, errstr))
self._sock = self._connect()
return None
def _readlines(self, sizehint=0):
"""Read multiple lines from socket"""
total = 0
list = []
while True:
line = self._readline()
if not line:
break
list.append(line)
total = len(list)
if sizehint and total >= sizehint:
break
return list
def __del__(self):
if not self._sock:
return
try:
self._sock.close()
except socket.error, (errno, errstr):
sys.stderr.write("[error] Closing socket failed: [%d] %s"
% (errno, errstr))
if __name__ == '__main__':
p = optparse.OptionParser()
p.add_option('--message', '-m', help='The notification message. Required')
p.add_option('--severity', '-s', choices=('FAILURE', 'WARNING', 'OKAY'), help='Severity of notification. Required')
p.add_option('--host', '-H', default=socket.gethostname(), help='Hostname. Defaults to the local hostname')
p.add_option('--plugin', '-p')
p.add_option('--plugininstance', '-P')
p.add_option('--type', '-t')
p.add_option('--typeinstance', '-T')
p.add_option('--socket', '-S', default='/var/run/collectd-unixsock', help='Bind to specified socket. Defaults to /var/run/collectd-unixsock')
p.add_option('--debug', '-d', action='store_true', help='Print socket communications')
options, arguments = p.parse_args()
if not options.message:
p.error("option -m or option --message is mandatory")
if not options.severity:
p.error("option -s or option --severity is mandatory")
notif_options = {'severity': options.severity, 'time': time.time(), 'host': options.host}
if options.plugin:
notif_options['plugin'] = options.plugin
if options.plugininstance:
notif_options['plugin-instance'] = options.plugininstance
if options.type:
notif_options['type'] = options.type
if options.typeinstance:
notif_options['type-instance'] = options.typeinstance
c = Collectd(options.socket, options.debug)
exit(c.putnotif(options.message.strip(), notif_options))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment