Last active
March 28, 2017 00:35
-
-
Save mpontillo/19148705c73162e9aec6e59c7951ed8f to your computer and use it in GitHub Desktop.
PySNMP example which scans a CIDR and tries to read the system MIB for each host.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
This file contains example code from the PySNMP project, which has been | |
heavily modified to only walk specified MIBs. | |
Examples in the comments include information on how to connect to a device | |
using SNMPv3, with or without privacy (and an authentication digest). | |
""" | |
import sys | |
from functools import partial | |
from collections import namedtuple | |
import asyncio | |
from netaddr import IPNetwork | |
from pysnmp.proto.errind import RequestTimedOut | |
from pysnmp.hlapi.asyncio import ( | |
bulkCmd, | |
SnmpEngine, | |
UsmUserData, | |
CommunityData, | |
ObjectType, | |
ObjectIdentity, | |
Udp6TransportTarget, | |
UdpTransportTarget, | |
ContextData, | |
isEndOfMib, | |
) | |
from pysnmp.smi import view | |
from pprint import pformat, pprint | |
MAX_REPITITIONS = 2 | |
# Note: One SnmpEngine is required per thread. We only need one since we're | |
# using the asyncio event loop for all our requests. | |
snmp_engine = SnmpEngine() | |
# These objects will help resolve symbolic MIB names into OIDs. | |
mib_builder = snmp_engine.getMibBuilder() | |
mib_view = view.MibViewController(mib_builder) | |
def _get_transport_target(host): | |
return Udp6TransportTarget if ":" in host else UdpTransportTarget | |
def _bulkCmd_shim(*args, **kwargs): | |
print("bulkCmd(%s, %s)" % ( | |
", ".join(str(arg) for arg in args), | |
", ".join("%s=%s" % (k, v) for k, v in kwargs))) | |
return bulkCmd(*args, **kwargs) | |
def partial_v2_get_bulk(host, community, port=161): | |
transport = _get_transport_target(host) | |
return partial( | |
bulkCmd, | |
# Replace the first parameter to this to print bulkCmd(...) params. | |
# _bulkCmd_shim, | |
snmp_engine, | |
CommunityData(community), | |
transport((host, port)), | |
ContextData(), | |
# Non-repeating OIDs (use get-next for this number of entities). | |
0, | |
# Repeating OIDs (get as many of these as possible in bulk). | |
MAX_REPITITIONS | |
) | |
# Note: possible SNMPv3 authorization and privacy protocols are defined | |
# in the `pysnmp.hlapi.auth` module: | |
# | |
# __all__ = ['CommunityData', 'UsmUserData', | |
# 'usm3DESEDEPrivProtocol', 'usmAesCfb128Protocol', | |
# 'usmAesCfb192Protocol', 'usmAesCfb256Protocol', | |
# 'usmDESPrivProtocol', 'usmHMACMD5AuthProtocol', | |
# 'usmHMACSHAAuthProtocol', 'usmNoAuthProtocol', | |
# 'usmNoPrivProtocol'] | |
def partial_v3_get_bulk(host, username, port=161): | |
transport = _get_transport_target(host) | |
return partial( | |
bulkCmd, | |
# Replace the first parameter to this to print bulkCmd(...) params. | |
# _bulkCmd_shim, | |
snmp_engine, | |
# For now, assumes a read-only username. SNMPv3 authentication can | |
# take a wide variety of parameters, though. | |
UsmUserData(username), | |
transport((host, port)), | |
ContextData(), | |
# Non-repeating OIDs (use get-next for this number of entities). | |
0, | |
# Repeating OIDs (get as many of these as possible in bulk). | |
MAX_REPITITIONS | |
) | |
class SNMPException(Exception): | |
"""Superclass to represent an SNMP error.""" | |
class SNMPIndicationException(SNMPException): | |
"""Raised if an error indication is received from the remote agent.""" | |
def __init__(self, message, indication, *args, **kwargs): | |
self.indication = indication | |
super().__init__(message, *args, **kwargs) | |
class SNMPStatusException(SNMPException): | |
"""Raised if an error status is received from the remote agent.""" | |
def __init__(self, message, status, *args, **kwargs): | |
self.status = status | |
super().__init__(message, *args, **kwargs) | |
SNMPConnectionMethod = namedtuple('SNMPConnectionMethod', [ | |
'description', | |
'coroutine', | |
]) | |
class SNMPSession: | |
def __init__(self, host, credentials=None, v2=True, v3=True): | |
self.host = host | |
self.v2 = v2 | |
self.v3 = v3 | |
self.methods = [] | |
if credentials is None: | |
# If credentials weren't specified, try using 'public', which is | |
# usually the default read community name. | |
credentials = "public" | |
self.add_credentials(credentials) | |
def add_credentials(self, credentials): | |
if isinstance(credentials, str): | |
if self.v3 is True: | |
self.methods.append(SNMPConnectionMethod( | |
"SNMPv3 (username: %s)" % credentials, | |
partial_v3_get_bulk(self.host, credentials))) | |
if self.v2 is True: | |
self.methods.append(SNMPConnectionMethod( | |
"SNMPv2c (community: %s)" % credentials, | |
partial_v2_get_bulk(self.host, credentials))) | |
class SNMPAttempt: | |
def __init__(self, session: SNMPSession): | |
self.session = session | |
async def try_fetch_mib(self, method, mib: ObjectType): | |
result = [] | |
done = False | |
mib.resolveWithMib(mib_view) | |
next = mib | |
oid = mib[0].asTuple() | |
while not done: | |
# print("Fetching: %s" % pformat(next)) | |
response = await method(next) | |
# pprint(response) | |
error_indication, error_status, error_index, data = response | |
if error_indication: | |
raise SNMPIndicationException( | |
"SNMP agent returned error indication: %s" % ( | |
error_indication), | |
error_indication) | |
if error_status: | |
raise SNMPStatusException( | |
"SNMP agent returned error status: %s (for %s)" % ( | |
error_status.prettyPrint(), mib), | |
error_status) | |
next = data[-1] | |
for i, row in enumerate(data): | |
# print("%d: %s" % (i, pformat(row))) | |
for binding in row: | |
result_oid = binding[0].asTuple() | |
normalized_result_oid = result_oid[:len(oid)] | |
if normalized_result_oid > oid: | |
done = True | |
break | |
result.append(binding) | |
if done is True: | |
break | |
if isEndOfMib(next): | |
done = True | |
next = next[0] | |
return result | |
async def fetch_mib(self, mib): | |
error_session_description = None | |
if isinstance(mib, str): | |
# Interpret standard-formatted MIB strings. | |
args = mib.split('::') | |
mib = ObjectType(ObjectIdentity(*args)) | |
methods = self.session.methods.copy() | |
while len(methods) > 0: | |
description, method = methods.pop(0) | |
try: | |
result = await self.try_fetch_mib(method, mib) | |
# If we got a result, no need to try another connection method. | |
return description, result | |
except SNMPIndicationException as e: | |
if not isinstance(e.indication, RequestTimedOut): | |
# If we didn't get a timeout, that means we were able to | |
# contact the remote SNMP agent and gather data from it. | |
# This is an important data point for network discovery. | |
error_session_description = description | |
elif error_session_description is None and len(methods) == 0: | |
raise e | |
except SNMPException as e: | |
if len(methods) == 0: | |
# This was the last possible method, so re-raise. | |
raise e | |
return error_session_description, None | |
async def print_mibs(self, *mibs): | |
try: | |
for mib in mibs: | |
description, result = await self.fetch_mib(mib) | |
if result is None: | |
print( | |
"Discovered SNMP agent on: %s; %s, but it did not " | |
"reply. Check the credentials and try again." % ( | |
self.session.host, description)) | |
else: | |
print("SNMP agent on %s replied via %s:" % ( | |
self.session.host, description)) | |
print("%s:" % mib) | |
for oid, value in result: | |
print("%s = %s" % (oid.prettyPrint(), value.prettyPrint())) | |
print("") | |
except SNMPIndicationException as e: | |
if not isinstance(e.indication, RequestTimedOut): | |
print(e) | |
except SNMPStatusException as e: | |
print(e) | |
async def shutdown(): | |
snmp_engine.transportDispatcher.closeDispatcher() | |
def main(): | |
attempts = [] | |
print_mibs = [ | |
"SNMPv2-MIB::system", | |
# "IF-MIB::ifTable", | |
# "IF-MIB::ifXTable", | |
# "IF-MIB::ifStackTable", | |
# "IP-MIB::ipNetToMediaTable", | |
# "RFC1213-MIB::ipRouteTable", | |
# "IP-FORWARD-MIB::inetCidrRouteTable", | |
# "BRIDGE-MIB::dot1dBasePortTable", | |
# "BRIDGE-MIB::dot1dStpPortTable", | |
# XXX: Q-BRIDGE-MIB Seems to be not found. | |
# "Q-BRIDGE-MIB::dot1qVlanCurrentTable", | |
# "Q-BRIDGE-MIB::dot1qVlanStaticTable", | |
# "Q-BRIDGE-MIB::dot1qPortVlanTable", | |
] | |
try: | |
network = IPNetwork(sys.argv[1]) | |
except: | |
print("Required parameter: CIDR to scan.") | |
raise SystemExit(1) | |
try: | |
read_community = sys.argv[2] | |
except: | |
read_community = "public" | |
for ip in network: | |
session = SNMPSession(str(ip), read_community) | |
attempts.append(SNMPAttempt(session).print_mibs(*print_mibs)) | |
# IPv6 test (passes) | |
# session = SNMPSession("2001:db8::1", "public") | |
# attempts.append(SNMPAttempt(session).print_mibs(*print_mibs)) | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(asyncio.gather(*attempts)) | |
loop.run_until_complete(shutdown()) | |
loop.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment