Skip to content

Instantly share code, notes, and snippets.

@xfguo
Created January 31, 2013 07:13
Show Gist options
  • Save xfguo/4680957 to your computer and use it in GitHub Desktop.
Save xfguo/4680957 to your computer and use it in GitHub Desktop.
from __future__ import absolute_import
import time
import bisect
import select
from threading import Thread
from pyasn1.codec.ber import encoder, decoder
from pysnmp.proto import api
from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher
from pysnmp.carrier.asynsock.dgram import udp
class Instr(object):
"""Abstract MIB instruction."""
isColumn = False
@property
def name(self):
raise NotImplementedError()
def __cmp__(self, other):
if len(other) > len(self.name) and self.name == other[:len(self.name)]:
return 1
elif self.isColumn and other == self.name:
return 1
else:
return cmp(self.name, other)
def execute(self, module, *args, **kwargs):
raise NotImplementedError()
def __call__(self, protoVer, *args, **kwargs):
return self.execute(api.protoModules[protoVer], *args, **kwargs)
def get_next_oid(self, curr_oid):
return self.name
class SysDescr(Instr):
name = (1, 3, 6, 1, 2, 1, 1, 1, 0)
def execute(self, module, oid = None):
return module.OctetString(
'PySNMP example command responder at %s' % __file__
)
class Uptime(Instr):
name = (1, 3, 6, 1, 2, 1, 1, 3, 0)
birthday = time.time()
def execute(self, module, oid = None):
return module.TimeTicks(
(time.time() - self.birthday) * 100
)
class SysORIndex(Instr):
name = (1, 3, 6, 1, 2, 1, 1, 9, 1, 1)
isColumn = True
def execute(self, module, oid = None):
if oid and len(oid) == (len(self.name) + 1) and oid[-1] in range(1, 4):
return module.Integer(oid[-1])
else:
return None
def get_next_oid(self, curr_oid = None):
if curr_oid == self.name:
return self.name + (1,)
elif len(curr_oid) == len(self.name) + 1 and curr_oid[:len(self.name)] == self.name:
if curr_oid[-1] < 3:
return self.name + (curr_oid[-1] + 1,)
else:
return None
else:
return self.name + (1,)
class SysORDescr(Instr):
name = (1, 3, 6, 1, 2, 1, 1, 9, 1, 3)
isColumn = True
def execute(self, module, oid = None):
if oid and len(oid) == (len(self.name) + 1) and oid[-1] in range(1, 4):
return module.OctetString("OR-" + str(oid[-1]))
else:
return None
def get_next_oid(self, curr_oid = None):
if curr_oid == self.name:
return self.name + (1,)
elif len(curr_oid) == len(self.name) + 1 and curr_oid[:len(self.name)] == self.name:
if curr_oid[-1] < 3:
return self.name + (curr_oid[-1] + 1,)
else:
return None
else:
return self.name + (1,)
def test_instr_compare():
instr_list = [SysDescr(), Uptime(), SysORIndex(), SysORDescr()]
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 1)) is 0
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 1, 0)) is 1
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 3)) is 1
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 3, 0)) is 2
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 1)) is 2
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 1, 1)) is 2
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 3)) is 3
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 3, 1)) is 3
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 4)) is 4
assert bisect.bisect(instr_list, (1, 3, 6, 1, 2, 1, 1, 9, 1, 4, 1)) is 4
class Agent(object):
def __init__(self, host, port):
self._mibInstr = []
self._mibInstrIdx = {}
self._transportDispatcher = AsynsockDispatcher()
self.host = host
self.port = port
def prepare(self):
address = (self.host, self.port)
transportDispatcher = self._transportDispatcher
transportDispatcher.registerTransport(
udp.domainName, udp.UdpSocketTransport().openServerMode(address)
)
transportDispatcher.registerRecvCbFun(self.cbFun)
transportDispatcher.jobStarted(1)
def start(self):
try:
self._transportDispatcher.runDispatcher()
except select.error:
pass
def stop(self):
self._transportDispatcher.closeDispatcher()
def registerInstr(self, instr):
assert callable(instr)
mibInstr = self._mibInstr
mibInstr.insert(bisect.bisect(mibInstr, instr.name), instr)
self._mibInstrIdx[instr.name] = instr
def cbFun(self, transportDispatcher, transportDomain, transportAddress, wholeMsg):
mibInstr = self._mibInstr
mibInstrIdx = self._mibInstrIdx
while wholeMsg:
msgVer = api.decodeMessageVersion(wholeMsg)
if msgVer in api.protoModules:
pMod = api.protoModules[msgVer]
else:
return
reqMsg, wholeMsg = decoder.decode(
wholeMsg, asn1Spec=pMod.Message(),
)
rspMsg = pMod.apiMessage.getResponse(reqMsg)
rspPDU = pMod.apiMessage.getPDU(rspMsg)
reqPDU = pMod.apiMessage.getPDU(reqMsg)
varBinds = []
errorIndex = -1
# GETNEXT PDU
if reqPDU.isSameTypeWith(pMod.GetNextRequestPDU()):
# Produce response var-binds
errorIndex = -1
for oid, val in pMod.apiPDU.getVarBinds(reqPDU):
errorIndex += 1
# Search next OID to report
nextIdx = bisect.bisect(mibInstr, oid)
while True:
if nextIdx == len(mibInstr):
pMod.apiPDU.setEndOfMibError(rspPDU, errorIndex)
break
else:
next_oid = mibInstr[nextIdx].get_next_oid(oid)
if next_oid is None:
nextIdx += 1
continue
else:
varBinds.append(
(next_oid, mibInstr[nextIdx](msgVer, next_oid))
)
break
elif reqPDU.isSameTypeWith(pMod.GetRequestPDU()):
for oid, val in pMod.apiPDU.getVarBinds(reqPDU):
if oid in mibInstrIdx:
varBinds.append(
(oid, mibInstrIdx[oid](msgVer))
)
else:
nextIdx = bisect.bisect(mibInstr, oid)
if nextIdx < len(mibInstr):
var = mibInstr[nextIdx](msgVer, oid)
if var is None:
# No such index or unavailable
try:
pMod.apiPDU.setNoSuchInstanceError(rspPDU, errorIndex)
except IndexError:
pass
varBinds = pMod.apiPDU.getVarBinds(reqPDU)
break
else:
varBinds.append(
(oid, var)
)
else:
# No such instance
try:
pMod.apiPDU.setNoSuchInstanceError(rspPDU, errorIndex)
except IndexError:
pass
varBinds = pMod.apiPDU.getVarBinds(reqPDU)
break
else:
# Report unsupported request type
pMod.apiPDU.setErrorStatus(rspPDU, 'genErr')
pMod.apiPDU.setVarBinds(rspPDU, varBinds)
transportDispatcher.sendMessage(
encoder.encode(rspMsg), transportDomain, transportAddress
)
return wholeMsg
class BackgroundAgent(Agent):
Container = Thread
def __init__(self, host, port):
super(BackgroundAgent, self).__init__(host, port)
self.container = None
def start(self):
assert self.container is None
self.prepare()
container = self.container = self.Container(
target=super(BackgroundAgent, self).start
)
container.daemon = True
container.start()
def stop(self):
assert self.container is not None
super(BackgroundAgent, self).stop()
self.container.join()
self.container = None
if __name__ == "__main__":
import time
agent = BackgroundAgent("0.0.0.0", 161)
agent.registerInstr(SysDescr())
agent.registerInstr(Uptime())
agent.registerInstr(SysORIndex())
agent.registerInstr(SysORDescr())
agent.start()
while True:
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment