Skip to content

Instantly share code, notes, and snippets.

@olivx
Last active January 28, 2020 18:16
Show Gist options
  • Save olivx/1ca9376d41d0657bddb6e7f9d2adee5b to your computer and use it in GitHub Desktop.
Save olivx/1ca9376d41d0657bddb6e7f9d2adee5b to your computer and use it in GitHub Desktop.
DATA_LISTENER_ENDPOINT: "127.0.0.1"
ESP_DATA_LISTENER_PORT: 1167
endpoints:
- localhost:8080/alert
- localhost:8081/alert
severity_levels:
- { "5": "critical" }
- { "4": "warning" }
- { "3": "warning" }
- { "2": "warning" }
- { "1": "ok" }
- { "0": "ok" }
oid_table:
- { "1.3.6.1.4.1.4055.1.1.1.0" : "status"}
- { "1.3.6.1.4.1.4055.1.1.2.0" : "subsystem"}
- { "1.3.6.1.4.1.4055.1.1.3.0" : "summary"}
- { "1.3.6.1.4.1.4055.1.1.4.0" : "item"}
- { "1.3.6.1.4.1.4055.1.1.5.0" : "probe"}
- { "1.3.6.1.4.1.4055.1.1.6.0" : "environment"}
- { "1.3.6.1.4.1.4055.1.1.7.0" : "usertag2"}
- { "1.3.6.1.4.1.4055.1.1.8.0" : "origin"}
- { "1.3.6.1.4.1.4055.1.1.9.0" : "ip"}
- { "1.3.6.1.4.1.4055.1.1.10.0" : "timestamp"}
- { "1.3.6.1.4.1.4055.1.1.13.0" : "host"}
- { "1.3.6.1.4.1.4055.1.1.11.0" : "timezone_offset"}
- { "1.3.6.1.4.1.4055.1.1.12.0" : "nimsoft_id"}
import asyncio
import logging
import signal
from pyasn1.codec.ber import decoder
from pysnmp.proto import api
from raven import Client
from .utils import esp_load_conf
logger = logging.getLogger(__name__)
sentry_client = Client(
'http://64b9f36e16174b20a382db4c1a31f57f:0c0f36b82bb842eb9ccd77dad7a52106@sentry.preprod.esp.aiops.tivit.corp/6'
)
class UDPProducer(asyncio.DatagramProtocol):
def __init__(self, *args, **kwargs):
self.loop = asyncio.get_event_loop()
self.configuration = esp_load_conf()
self.listener_endpoint = self.configuration['DATA_LISTENER_ENDPOINT']
self.listener_port = self.configuration['ESP_DATA_LISTENER_PORT']
def connection_made(self, transport):
logger.info("Async socket ready. Starting Datagram listener.")
self.transport = transport
def decode_data(self, data):
return str(data)
def datagram_received(self, data, addr):
try:
logger.info("Received %r from %s" % (data, addr))
self.transport.sendto(b"OK", addr)
data = self.decode_data(data)
except Exception as e:
sentry_client.captureException(
tags={"name": self.name(), "transport": "UDP", "data": data or None}
)
logger.error("Error in UDP transport ", exc_info=True)
def start(self):
self.received = False
asyncio.gather(
asyncio.ensure_future(
self.loop.create_datagram_endpoint(
lambda: self,
local_addr=(self.listener_endpoint, self.listener_port),
)
),
)
logger.info("Listening for an alert")
self.loop.run_forever()
class TrapProducer(UDPProducer):
def oid_table(self):
data = {
key: value
for data in self.configuration['oid_table']
for key, value in data.items()
}
return data
def decode_data(self, data):
msgVer = int(api.decodeMessageVersion(data))
if msgVer in api.protoModules:
pMod = api.protoModules[msgVer]
else:
self.logger.info('Unsupported SNMP version %s' % msgVer)
return
reqMsg, wholeMsg = decoder.decode(data, asn1Spec=pMod.Message())
reqPDU = pMod.apiMessage.getPDU(reqMsg)
varBinded = {}
if reqPDU.isSameTypeWith(pMod.TrapPDU()):
if msgVer == api.protoVersion1:
varBinds = pMod.apiTrapPDU.getVarBinds(reqPDU)
else:
varBinds = pMod.apiPDU.getVarBinds(reqPDU)
for oid, val in varBinds:
varBinded['%s' % oid] = '%s' % val
# logger.info('loger varBinded', extra={'data', varBinded})
alert = {}
for key, value in self.oid_table().items():
if varBinded.get(key):
alert[value] = varBinded[key]
logger.info(
"Found OID %s with value %s . Mapping..." % (key, varBinded[key])
)
else:
logger.info("Did not find OID %s. Skipping..." % key)
return alert
import logging
import yaml
from .settings import ESP_PATH_LOAD_CONFIGURATION
logger = logging.getLogger(__name__)
def esp_load_conf():
with open(ESP_PATH_LOAD_CONFIGURATION, 'r') as esp_config:
data = yaml.safe_load(esp_config)
logger.info('load cliet esp configurations ', extra=data)
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment