Created
December 2, 2014 09:41
-
-
Save vincentbernat/244004c94e1932d86f14 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
SNMP helper for HAproxy implementing EXCELIANCE-MIB with | |
`pass_persist` protocol. Data are retrieved from multiple instances of | |
haproxy using HTTP. | |
""" | |
def toOid(oid): | |
"""Convert a string to tuple OID""" | |
oid = oid.split(".") | |
if not oid[0]: | |
oid = oid[1:] | |
return tuple([int(o) for o in oid]) | |
class OPTIONS: | |
# HAProxy instances we will connect to | |
INSTANCES = [ ('127.0.0.1', 8880 + i) for i in range(1,10) ] | |
# URI to query to get stats | |
URI = "/stats?haproxy;csv" | |
# Root OID | |
ROOT = toOid("1.3.6.1.4.1.23263.4.2.1.3") | |
# Refresh interval | |
INTERVAL = 5 | |
# MIB database | |
MIB = {} | |
MIBKEYS = [] | |
import socket | |
import sys | |
import threading | |
import time | |
import httplib | |
import urllib | |
import errno | |
# Setup syslog | |
import logging | |
from logging.handlers import SysLogHandler | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
syslog = SysLogHandler(address='/dev/log') | |
syslog.setFormatter(logging.Formatter('%(asctime)s haproxy-snmpd: %(funcName)s():%(lineno)d: %(message)s', | |
'%b %e %H:%M:%S')) | |
logger.addHandler(syslog) | |
MIBLock = threading.Lock() | |
def data(): | |
"""Background thread to gather data from HAProxy instances""" | |
while True: | |
time.sleep(OPTIONS.INTERVAL) | |
logger.debug("Refresh MIB") | |
new_mib = { } | |
for instance in OPTIONS.INSTANCES: | |
http = httplib.HTTPConnection(*instance, timeout=5) | |
try: | |
http.request("GET", OPTIONS.URI) | |
except socket.error as e: | |
if e.errno != errno.ECONNREFUSED: | |
logger.warning("Query %r and got exception: %r", instance, e) | |
continue | |
except Exception as e: | |
logger.warning("Query %r and got exception: %r", instance, e) | |
continue | |
res = http.getresponse() | |
if res.status != 200: | |
logger.warning("Query %r and got status %d", instance, res.status) | |
continue | |
# Read available data | |
data = res.read().split("\n") | |
fields = data[0][2:] # Strip "# " | |
fields = fields.split(",")[:-1] | |
for d in data[1:]: | |
d = d.split(",")[:-1] | |
d = dict(zip(fields, d)) | |
if not d: | |
continue | |
new_mib[(1,1,1,int(d["pid"]))] = ("integer", d["pid"]) # alProcessID | |
if d["svname"] == "FRONTEND": | |
# frontend | |
for iid, key, ctype in [(1, "pid", "integer"), # alFrontendProcessID | |
(2, "iid", "integer"), # alFrontendID | |
(3, "pxname", "string"), # alFrontendName | |
(4, "scur", "gauge64"), # alFrontendSessionCur | |
(5, "smax", "gauge64"), # alFrontendSessionMax | |
(6, "slim", "gauge64"), # alFrontendSessionLimit | |
(7, "stot", "counter64"), # alFrontendSessionTotal | |
(8, "bin", "counter64"), # alFrontendBytesIN | |
(9, "bout", "counter64"), # alFrontendBytesOUT | |
(10, "ereq", "counter64"), # alFrontendErrorRequest | |
(11, "dreq", "counter64"), # alFrontendDenyRequest | |
(12, "dresp", "counter64"), # alFrontendDenyResponse | |
(13, "status","string"), # alFrontendStatus | |
]: | |
if d[key]: new_mib[(2,1,iid,int(d["pid"]),int(d["iid"]))] = (ctype, d[key]) | |
elif d["svname"] == "BACKEND": | |
# backend | |
for iid, key, ctype in [(1, "pid", "integer"), # alBackendProcessID | |
(2, "iid", "integer"), # alBackendID | |
(3, "pxname", "string"), # alBackendName | |
(4, "qcur", "gauge64"), # alBackendQueueCur | |
(5, "qmax", "gauge64"), # alBackendQueueMax | |
(6, "qlimit", "gauge64"), # alBackendQueueLimit | |
(7, "scur", "gauge64"), # alBackendSessionCur | |
(8, "smax", "gauge64"), # alBackendSessionMax | |
(9, "slim", "gauge64"), # alBackendSessionLimit | |
(10, "stot", "counter64"), # alBackendSessionTotal | |
(11, "lbtot", "counter64"), # alBackendSessionLoadBalanced | |
(12, "bin", "counter64"), # alBackendBytesIN | |
(13, "bout", "counter64"), # alBackendBytesOUT | |
(14, "econ", "counter64"), # alBackendErrorConnection | |
(15, "eresp", "counter64"), # alBackendErrorResponse | |
(16, "dreq", "counter64"), # alBackendDenyRequest | |
(17, "dresp", "counter64"), # alBackendDenyResponse | |
(18, "wretr", "counter64"), # alBackendWarningRetry | |
(19, "wredis","counter64"), # alBackendWarningRedispatch | |
(20, "status","string"), # alBackendStatus | |
(21, "lastchg","timetick"),# alBackendLastChange | |
(22, "chkdown","counter32"), # alBackendCheckDown | |
(23, "downtime","timetick"),# alBackendDownTime | |
]: | |
if d[key]: new_mib[(3,1,iid,int(d["pid"]),int(d["iid"]))] = (ctype, d[key]) | |
else: | |
# server | |
for iid, key, ctype in [(1, "pid", "integer"), # alServerProcessID | |
(2, "iid", "integer"), # alServerBackendID | |
(3, "sid", "integer"), # alServerID | |
(4, "svname", "string"), # alServerName | |
(5, "qcur", "gauge64"), # alServerQueueCur | |
(6, "qmax", "gauge64"), # alServerQueueMax | |
(7, "qlimit", "gauge64"), # alServerQueueLimit | |
(8, "scur", "gauge64"), # alServerSessionCur | |
(9, "smax", "gauge64"), # alServerSessionMax | |
(10, "slim", "gauge64"), # alServerSessionLimit | |
(11, "stot", "counter64"), # alServerSessionTotal | |
(12, "lbtot", "counter64"), # alServerSessionLoadBalanced | |
(13, "bin", "counter64"), # alServerBytesIN | |
(14, "bout", "counter64"), # alServerBytesOUT | |
(15, "econ", "counter64"), # alServerErrorConnection | |
(16, "eresp", "counter64"), # alServerErrorResponse | |
(17, "dresp", "counter64"), # alServerDenyResponse | |
(18, "wretr", "counter64"), # alServerWarningRetry | |
(19, "status", "string"), # alServerStatus | |
(20, "lastchg","timetick"), # alServerLastChange | |
(21, "weight", "gauge"), # alServerWeight | |
(22, "act", "integer"), # alServerActive | |
(23, "bck", "integer"), # alServerBackup | |
(24, "chkfail","counter"), # alServerCheckFailure | |
(25, "chkdown","counter"), # alServerCheckDown | |
(26, "downtime","timetick"),# alServerDownTime | |
(27, "throttle","integer"), # alServerThrottle | |
]: | |
if d[key]: new_mib[(4,1,iid, | |
int(d["pid"]),int(d["iid"]),int(d["sid"]))] = (ctype, d[key]) | |
# Update MIB | |
with MIBLock: | |
global MIB | |
global MIBKEYS | |
MIB = new_mib | |
MIBKEYS = MIB.keys() | |
MIBKEYS.sort() | |
logger.debug("Refresh complete, got %d keys", len(MIBKEYS)) | |
def dataLoop(): | |
while True: | |
try: | |
data() | |
except socket.timeout: | |
pass | |
except Exception as e: | |
logger.exception(e) | |
def display(oid): | |
"""Display the given OID""" | |
ctype, value = MIB[oid] | |
# Wrap integer values to 32 bits | |
if ctype == "timetick": | |
value = (int(value) * 100) & 0xffffffff | |
elif ctype in ["integer", "counter", "gauge"]: | |
value = int(value) & 0xffffffff | |
elif ctype in ["integer64", "counter64", "gauge64"]: | |
value = int(value) & 0xffffffffffffffff | |
print ".%s.%s" % (".".join([str(x) for x in OPTIONS.ROOT]), | |
".".join([str(x) for x in oid])) | |
print ctype | |
print value | |
logger.debug("Return %r = %s: %s", oid, ctype, value) | |
def setc(oid, stype, svalue): | |
"""Set the given OID""" | |
oid = toOid(oid) | |
logger.info("SET %r", oid) | |
if oid[:len(OPTIONS.ROOT)] != OPTIONS.ROOT: | |
print "NONE" | |
logger.debug("OID %r not prefixed by root", oid) | |
return | |
oid = oid[len(OPTIONS.ROOT):] | |
with MIBLock: | |
if oid not in MIB: | |
print "not-writable" | |
logger.debug("OID %r not available", oid) | |
return | |
if oid[:3] != (4, 1, 19): # alServerStatus | |
print "not-writable" | |
logger.debug("OID %r not writable", oid) | |
return | |
if stype.upper() == "OCTET": | |
# Convert to STRING | |
stype = "STRING" | |
svalue = '"%s"' % "".join([chr(int(x, 16)) for x in svalue[1:-1].split(" ") if x != "00"]) | |
if stype.upper() == "STRING": | |
if svalue not in [ '"UP"', '"MAINT"' ]: | |
print "wrong-value" | |
logger.warning("only UP and MAINT are allowed, not %r", svalue) | |
return | |
else: | |
print "wrong-type" | |
logger.warning("only STRING is allowed, not %s", stype.upper()) | |
return | |
pid, bid, sid = oid[3:] | |
# Order is important | |
params = "&".join([urllib.urlencode(x) for x in | |
[{ 's': MIB[4, 1, 4, pid, bid, sid][1] }, # alServerName | |
{ 'b': MIB[3, 1, 3, pid, bid][1] }, # alBackendName | |
{ 'action': (svalue == '"UP"') and "enable" or "disable" }]]) | |
instance = OPTIONS.INSTANCES[pid - 1] | |
http = httplib.HTTPConnection(*instance, timeout=5) | |
try: | |
http.request("POST", OPTIONS.URI, params, { "Content-type": "application/x-www-form-urlencoded", | |
"Accept": "text/plain" }) | |
except socket.error as e: | |
logger.warning("Query %r and got exception: %r", instance, e) | |
print "not-writable" | |
return | |
except Exception as e: | |
logger.warning("Query %r and got exception: %r", instance, e) | |
print "not-writable" | |
return | |
res = http.getresponse() | |
if res.status != 303: | |
logger.warning("Query %r and got status %d", instance, res.status) | |
print "not-writable" | |
return | |
print "DONE" | |
def get(oid): | |
"""Get the given OID from the MIB""" | |
oid = toOid(oid) | |
logger.debug("GET %r", oid) | |
if oid[:len(OPTIONS.ROOT)] != OPTIONS.ROOT: | |
print "NONE" | |
logger.debug("OID %r not prefixed by root", oid) | |
return | |
oid = oid[len(OPTIONS.ROOT):] | |
with MIBLock: | |
if oid not in MIB: | |
print "NONE" | |
logger.debug("OID %r not available", oid) | |
return | |
display(oid) | |
def getnext(oid): | |
"""Return the next OID""" | |
oid = toOid(oid) | |
logger.debug("GETNEXT %r", oid) | |
with MIBLock: | |
if not MIBKEYS: | |
print "NONE" | |
logger.info("Empty MIB") | |
return | |
if oid < OPTIONS.ROOT: | |
with MIBLock: | |
display(MIBKEYS[0]) | |
return | |
if oid[:len(OPTIONS.ROOT)] != OPTIONS.ROOT: | |
print "NONE" | |
logger.debug("No more data available in MIB") | |
return | |
oid = oid[len(OPTIONS.ROOT):] | |
with MIBLock: | |
for target in MIBKEYS: | |
if target > oid: | |
display(target) | |
return | |
logger.debug("No more data available in MIB") | |
print "NONE" | |
# Simple FSM | |
class State: | |
INIT=1 # Initialization state (PING/PONG) | |
COMMAND=2 # Waiting for a command (get, getnext, set) | |
GET=3 # In a GET command (waiting for OID) | |
GETNEXT=4 # In a GETNEXT command (waiting for OID) | |
SET1=5 # In a SET command (waiting for OID) | |
SET2=6 # In a SET command (waiting for value) | |
def mainLoop(): | |
"""Main loop""" | |
setoid = None | |
logger.debug("Starting") | |
state = State.INIT | |
while True: | |
sys.stdout.flush() | |
input = sys.stdin.readline().strip() | |
if input == '': | |
# In any state, a blank line is the end of the program | |
logger.info("Terminating") | |
break | |
if state == State.INIT: | |
if input != 'PING': | |
logger.error("Waiting for PING, got %r instead", input) | |
break | |
print 'PONG' | |
logger.debug("Got PING command") | |
state = State.COMMAND | |
continue | |
if state == State.COMMAND: | |
# We are waiting for a command: get/getnext/set | |
if input == "PING": | |
print "PONG" | |
continue | |
if input == "get": | |
state = State.GET | |
continue | |
if input == "getnext": | |
state = State.GETNEXT | |
continue | |
if input == "set": | |
state = State.SET1 | |
continue | |
logger.error("Waiting for a command but got %r instead", input) | |
break | |
# Allow to enable/disable a server | |
if state == State.SET1: | |
setoid = input | |
state = State.SET2 | |
continue | |
if state == State.SET2: | |
vtype, value = input.split(" ", 1) | |
setc(setoid, vtype, value) | |
state = State.COMMAND | |
continue | |
# GET and GETNEXT | |
if state == State.GET: | |
get(input) | |
state = State.COMMAND | |
continue | |
if state == State.GETNEXT: | |
getnext(input) | |
state = State.COMMAND | |
continue | |
# We should not be there | |
logger.error("Unknown state: %d", state) | |
break | |
# Data loop | |
t = threading.Thread(target=dataLoop) | |
t.daemon = True | |
t.start() | |
# Main loop | |
try: | |
mainLoop() | |
except Exception as e: | |
logger.exception(e) | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment