Skip to content

Instantly share code, notes, and snippets.

@mpontillo
Last active March 28, 2017 00:35
Show Gist options
  • Save mpontillo/19148705c73162e9aec6e59c7951ed8f to your computer and use it in GitHub Desktop.
Save mpontillo/19148705c73162e9aec6e59c7951ed8f to your computer and use it in GitHub Desktop.
PySNMP example which scans a CIDR and tries to read the system MIB for each host.
#!/usr/bin/env python3
"""
This file contains example code from the PySNMP project, which has been
heavily modified to only walk specified MIBs.
Examples in the comments include information on how to connect to a device
using SNMPv3, with or without privacy (and an authentication digest).
"""
import sys
from functools import partial
from collections import namedtuple
import asyncio
from netaddr import IPNetwork
from pysnmp.proto.errind import RequestTimedOut
from pysnmp.hlapi.asyncio import (
bulkCmd,
SnmpEngine,
UsmUserData,
CommunityData,
ObjectType,
ObjectIdentity,
Udp6TransportTarget,
UdpTransportTarget,
ContextData,
isEndOfMib,
)
from pysnmp.smi import view
from pprint import pformat, pprint
MAX_REPITITIONS = 2
# Note: One SnmpEngine is required per thread. We only need one since we're
# using the asyncio event loop for all our requests.
snmp_engine = SnmpEngine()
# These objects will help resolve symbolic MIB names into OIDs.
mib_builder = snmp_engine.getMibBuilder()
mib_view = view.MibViewController(mib_builder)
def _get_transport_target(host):
return Udp6TransportTarget if ":" in host else UdpTransportTarget
def _bulkCmd_shim(*args, **kwargs):
print("bulkCmd(%s, %s)" % (
", ".join(str(arg) for arg in args),
", ".join("%s=%s" % (k, v) for k, v in kwargs)))
return bulkCmd(*args, **kwargs)
def partial_v2_get_bulk(host, community, port=161):
transport = _get_transport_target(host)
return partial(
bulkCmd,
# Replace the first parameter to this to print bulkCmd(...) params.
# _bulkCmd_shim,
snmp_engine,
CommunityData(community),
transport((host, port)),
ContextData(),
# Non-repeating OIDs (use get-next for this number of entities).
0,
# Repeating OIDs (get as many of these as possible in bulk).
MAX_REPITITIONS
)
# Note: possible SNMPv3 authorization and privacy protocols are defined
# in the `pysnmp.hlapi.auth` module:
#
# __all__ = ['CommunityData', 'UsmUserData',
# 'usm3DESEDEPrivProtocol', 'usmAesCfb128Protocol',
# 'usmAesCfb192Protocol', 'usmAesCfb256Protocol',
# 'usmDESPrivProtocol', 'usmHMACMD5AuthProtocol',
# 'usmHMACSHAAuthProtocol', 'usmNoAuthProtocol',
# 'usmNoPrivProtocol']
def partial_v3_get_bulk(host, username, port=161):
transport = _get_transport_target(host)
return partial(
bulkCmd,
# Replace the first parameter to this to print bulkCmd(...) params.
# _bulkCmd_shim,
snmp_engine,
# For now, assumes a read-only username. SNMPv3 authentication can
# take a wide variety of parameters, though.
UsmUserData(username),
transport((host, port)),
ContextData(),
# Non-repeating OIDs (use get-next for this number of entities).
0,
# Repeating OIDs (get as many of these as possible in bulk).
MAX_REPITITIONS
)
class SNMPException(Exception):
"""Superclass to represent an SNMP error."""
class SNMPIndicationException(SNMPException):
"""Raised if an error indication is received from the remote agent."""
def __init__(self, message, indication, *args, **kwargs):
self.indication = indication
super().__init__(message, *args, **kwargs)
class SNMPStatusException(SNMPException):
"""Raised if an error status is received from the remote agent."""
def __init__(self, message, status, *args, **kwargs):
self.status = status
super().__init__(message, *args, **kwargs)
SNMPConnectionMethod = namedtuple('SNMPConnectionMethod', [
'description',
'coroutine',
])
class SNMPSession:
def __init__(self, host, credentials=None, v2=True, v3=True):
self.host = host
self.v2 = v2
self.v3 = v3
self.methods = []
if credentials is None:
# If credentials weren't specified, try using 'public', which is
# usually the default read community name.
credentials = "public"
self.add_credentials(credentials)
def add_credentials(self, credentials):
if isinstance(credentials, str):
if self.v3 is True:
self.methods.append(SNMPConnectionMethod(
"SNMPv3 (username: %s)" % credentials,
partial_v3_get_bulk(self.host, credentials)))
if self.v2 is True:
self.methods.append(SNMPConnectionMethod(
"SNMPv2c (community: %s)" % credentials,
partial_v2_get_bulk(self.host, credentials)))
class SNMPAttempt:
def __init__(self, session: SNMPSession):
self.session = session
async def try_fetch_mib(self, method, mib: ObjectType):
result = []
done = False
mib.resolveWithMib(mib_view)
next = mib
oid = mib[0].asTuple()
while not done:
# print("Fetching: %s" % pformat(next))
response = await method(next)
# pprint(response)
error_indication, error_status, error_index, data = response
if error_indication:
raise SNMPIndicationException(
"SNMP agent returned error indication: %s" % (
error_indication),
error_indication)
if error_status:
raise SNMPStatusException(
"SNMP agent returned error status: %s (for %s)" % (
error_status.prettyPrint(), mib),
error_status)
next = data[-1]
for i, row in enumerate(data):
# print("%d: %s" % (i, pformat(row)))
for binding in row:
result_oid = binding[0].asTuple()
normalized_result_oid = result_oid[:len(oid)]
if normalized_result_oid > oid:
done = True
break
result.append(binding)
if done is True:
break
if isEndOfMib(next):
done = True
next = next[0]
return result
async def fetch_mib(self, mib):
error_session_description = None
if isinstance(mib, str):
# Interpret standard-formatted MIB strings.
args = mib.split('::')
mib = ObjectType(ObjectIdentity(*args))
methods = self.session.methods.copy()
while len(methods) > 0:
description, method = methods.pop(0)
try:
result = await self.try_fetch_mib(method, mib)
# If we got a result, no need to try another connection method.
return description, result
except SNMPIndicationException as e:
if not isinstance(e.indication, RequestTimedOut):
# If we didn't get a timeout, that means we were able to
# contact the remote SNMP agent and gather data from it.
# This is an important data point for network discovery.
error_session_description = description
elif error_session_description is None and len(methods) == 0:
raise e
except SNMPException as e:
if len(methods) == 0:
# This was the last possible method, so re-raise.
raise e
return error_session_description, None
async def print_mibs(self, *mibs):
try:
for mib in mibs:
description, result = await self.fetch_mib(mib)
if result is None:
print(
"Discovered SNMP agent on: %s; %s, but it did not "
"reply. Check the credentials and try again." % (
self.session.host, description))
else:
print("SNMP agent on %s replied via %s:" % (
self.session.host, description))
print("%s:" % mib)
for oid, value in result:
print("%s = %s" % (oid.prettyPrint(), value.prettyPrint()))
print("")
except SNMPIndicationException as e:
if not isinstance(e.indication, RequestTimedOut):
print(e)
except SNMPStatusException as e:
print(e)
async def shutdown():
snmp_engine.transportDispatcher.closeDispatcher()
def main():
attempts = []
print_mibs = [
"SNMPv2-MIB::system",
# "IF-MIB::ifTable",
# "IF-MIB::ifXTable",
# "IF-MIB::ifStackTable",
# "IP-MIB::ipNetToMediaTable",
# "RFC1213-MIB::ipRouteTable",
# "IP-FORWARD-MIB::inetCidrRouteTable",
# "BRIDGE-MIB::dot1dBasePortTable",
# "BRIDGE-MIB::dot1dStpPortTable",
# XXX: Q-BRIDGE-MIB Seems to be not found.
# "Q-BRIDGE-MIB::dot1qVlanCurrentTable",
# "Q-BRIDGE-MIB::dot1qVlanStaticTable",
# "Q-BRIDGE-MIB::dot1qPortVlanTable",
]
try:
network = IPNetwork(sys.argv[1])
except:
print("Required parameter: CIDR to scan.")
raise SystemExit(1)
try:
read_community = sys.argv[2]
except:
read_community = "public"
for ip in network:
session = SNMPSession(str(ip), read_community)
attempts.append(SNMPAttempt(session).print_mibs(*print_mibs))
# IPv6 test (passes)
# session = SNMPSession("2001:db8::1", "public")
# attempts.append(SNMPAttempt(session).print_mibs(*print_mibs))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*attempts))
loop.run_until_complete(shutdown())
loop.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment