Skip to content

Instantly share code, notes, and snippets.

@rsampaio
Created March 20, 2013 15:53
Show Gist options
  • Save rsampaio/5205824 to your computer and use it in GitHub Desktop.
Save rsampaio/5205824 to your computer and use it in GitHub Desktop.
collectd protocol python script
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim: fileencoding=utf-8
#
# Copyright © 2009 Adrian Perez <[email protected]>
#
# Distributed under terms of the GPLv2 license.
"""
Collectd network protocol implementation.
"""
import socket
import struct
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from datetime import datetime
from copy import deepcopy
DEFAULT_PORT = 25826
"""Default port"""
DEFAULT_IPv4_GROUP = "239.192.74.66"
"""Default IPv4 multicast group"""
DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
"""Default IPv6 multicast group"""
# Message kinds
TYPE_HOST = 0x0000
TYPE_TIME = 0x0001
TYPE_PLUGIN = 0x0002
TYPE_PLUGIN_INSTANCE = 0x0003
TYPE_TYPE = 0x0004
TYPE_TYPE_INSTANCE = 0x0005
TYPE_VALUES = 0x0006
TYPE_INTERVAL = 0x0007
TYPE_TIME_HI = 0x0008
TYPE_INTERVAL_HI = 0x0009
# For notifications
TYPE_MESSAGE = 0x0100
TYPE_SEVERITY = 0x0101
# DS kinds
DS_TYPE_COUNTER = 0
DS_TYPE_GAUGE = 1
DS_TYPE_DERIVE = 2
DS_TYPE_ABSOLUTE = 3
header = struct.Struct("!2H")
number = struct.Struct("!Q")
short = struct.Struct("!H")
double = struct.Struct("<d")
def decode_network_values(ptype, plen, buf):
"""Decodes a list of DS values in collectd network format
"""
nvalues = short.unpack_from(buf, header.size)[0]
off = header.size + short.size + nvalues
valskip = double.size
# Check whether our expected packet size is the reported one
assert ((valskip + 1) * nvalues + short.size + header.size) == plen
assert double.size == number.size
result = []
for dstype in map(ord, buf[header.size+short.size:off]):
if dstype == DS_TYPE_COUNTER:
result.append((dstype, number.unpack_from(buf, off)[0]))
off += valskip
elif dstype == DS_TYPE_GAUGE:
result.append((dstype, double.unpack_from(buf, off)[0]))
off += valskip
elif dstype == DS_TYPE_DERIVE:
result.append((dstype, double.unpack_from(buf, off)[0]))
off += valskip
elif dstype == DS_TYPE_ABSOLUTE:
result.append((dstype, double.unpack_from(buf, off)[0]))
off += valskip
else:
raise ValueError("DS type %i unsupported" % dstype)
return result
def decode_network_number(ptype, plen, buf):
"""Decodes a number (64-bit unsigned) in collectd network format.
"""
return number.unpack_from(buf, header.size)[0]
def decode_network_string(msgtype, plen, buf):
"""Decodes a floating point number (64-bit) in collectd network format.
"""
return buf[header.size:plen-1]
# Mapping of message types to decoding functions.
_decoders = {
TYPE_VALUES : decode_network_values,
TYPE_TIME : decode_network_number,
TYPE_TIME_HI : decode_network_number,
TYPE_INTERVAL : decode_network_number,
TYPE_INTERVAL_HI : decode_network_number,
TYPE_HOST : decode_network_string,
TYPE_PLUGIN : decode_network_string,
TYPE_PLUGIN_INSTANCE: decode_network_string,
TYPE_TYPE : decode_network_string,
TYPE_TYPE_INSTANCE : decode_network_string,
TYPE_MESSAGE : decode_network_string,
TYPE_SEVERITY : decode_network_number,
}
def decode_network_packet(buf):
"""Decodes a network packet in collectd format.
"""
off = 0
blen = len(buf)
while off < blen:
ptype, plen = header.unpack_from(buf, off)
print ptype, plen, blen - off
if plen > blen - off:
print("Packet longer than amount of data in buffer", buf)
if ptype not in _decoders:
raise ValueError("Message type %i not recognized" % ptype)
yield ptype, _decoders[ptype](ptype, plen, buf[off:])
off += plen
class Data(object):
time = 0
host = None
plugin = None
plugininstance = None
type = None
typeinstance = None
def __init__(self, **kw):
[setattr(self, k, v) for k, v in kw.iteritems()]
@property
def datetime(self):
return datetime.fromtimestamp(self.time)
@property
def source(self):
buf = StringIO()
if self.host:
buf.write(self.host)
if self.plugin:
buf.write("/")
buf.write(self.plugin)
if self.plugininstance:
buf.write("/")
buf.write(self.plugininstance)
if self.type:
buf.write("/")
buf.write(self.type)
if self.typeinstance:
buf.write("/")
buf.write(self.typeinstance)
return buf.getvalue()
def __str__(self):
return "[%i] %s" % (self.time, self.source)
class Notification(Data):
FAILURE = 1
WARNING = 2
OKAY = 4
SEVERITY = {
FAILURE: "FAILURE",
WARNING: "WARNING",
OKAY : "OKAY",
}
__severity = 0
message = ""
def __set_severity(self, value):
if value in (self.FAILURE, self.WARNING, self.OKAY):
self.__severity = value
severity = property(lambda self: self.__severity, __set_severity)
@property
def severitystring(self):
return self.SEVERITY.get(self.severity, "UNKNOWN")
def __str__(self):
return "%s [%s] %s" % (
super(Notification, self).__str__(),
self.severitystring,
self.message)
class Values(Data, list):
def __str__(self):
return "%s %s" % (Data.__str__(self), list.__str__(self))
def interpret_opcodes(iterable):
vl = Values()
nt = Notification()
for kind, data in iterable:
if kind == TYPE_TIME:
vl.time = nt.time = data
if kind == TYPE_TIME_HI:
vl.time = nt.time = data
elif kind == TYPE_INTERVAL:
vl.interval = data
elif kind == TYPE_INTERVAL_HI:
vl.interval = data
elif kind == TYPE_HOST:
vl.host = nt.host = data
elif kind == TYPE_PLUGIN:
vl.plugin = nt.plugin = data
elif kind == TYPE_PLUGIN_INSTANCE:
vl.plugininstance = nt.plugininstance = data
elif kind == TYPE_TYPE:
vl.type = nt.type = data
elif kind == TYPE_TYPE_INSTANCE:
vl.typeinstance = nt.typeinstance = data
elif kind == TYPE_SEVERITY:
nt.severity = data
elif kind == TYPE_MESSAGE:
nt.message = data
yield deepcopy(nt)
elif kind == TYPE_VALUES:
vl[:] = data
yield deepcopy(vl)
class Reader(object):
"""Network reader for collectd data.
Listens on the network in a given address, which can be a multicast
group address, and handles reading data when it arrives.
"""
addr = None
host = None
port = DEFAULT_PORT
BUFFER_SIZE = 1024
def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
if host is None:
multicast = True
host = DEFAULT_IPv4_GROUP
self.host, self.port = host, port
self.ipv6 = ":" in self.host
family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
None if multicast else self.host, self.port,
socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
self._sock = socket.socket(family, socktype, proto)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind(sockaddr)
if multicast:
if hasattr(socket, "SO_REUSEPORT"):
self._sock.setsockopt(
socket.SOL_SOCKET,
socket.SO_REUSEPORT, 1)
val = None
if family == socket.AF_INET:
assert "." in self.host
val = struct.pack("4sl",
socket.inet_aton(self.host), socket.INADDR_ANY)
elif family == socket.AF_INET6:
raise NotImplementedError("IPv6 support not ready yet")
else:
raise ValueError("Unsupported network address family")
self._sock.setsockopt(
socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
socket.IP_ADD_MEMBERSHIP, val)
self._sock.setsockopt(
socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
socket.IP_MULTICAST_LOOP, 0)
def receive(self):
"""Receives a single raw collect network packet.
"""
return self._sock.recv(self.BUFFER_SIZE)
def decode(self, buf=None):
"""Decodes a given buffer or the next received packet.
"""
if buf is None:
buf = self.receive()
return decode_network_packet(buf)
def interpret(self, iterable=None):
"""Interprets a sequence
"""
if iterable is None:
iterable = self.decode()
if isinstance(iterable, basestring):
iterable = self.decode(iterable)
return interpret_opcodes(iterable)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment