Skip to content

Instantly share code, notes, and snippets.

@kmix
Last active May 1, 2017 23:46
Show Gist options
  • Save kmix/62ab30271ed3164bdd3f60c435157453 to your computer and use it in GitHub Desktop.
Save kmix/62ab30271ed3164bdd3f60c435157453 to your computer and use it in GitHub Desktop.
Simple Network Slackbot (Deployed via AWS Lambda)
import boto3
import json
import logging
import os
import socket
import urllib2
from base64 import b64decode
from urlparse import parse_qs
ENCRYPTED_EXPECTED_TOKEN = os.environ['kmsEncryptedToken']
kms = boto3.client('kms')
expected_token = kms.decrypt(CiphertextBlob=b64decode(ENCRYPTED_EXPECTED_TOKEN))['Plaintext']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def respond(err, res=None):
return {
'statusCode': '400' if err else '200',
'body': err.message if err else json.dumps(res),
'headers': {
'Content-Type': 'application/json',
},
}
def slack_response(title, attachment_body, ephemeral):
if ephemeral:
response_type = "ephemeral"
else:
response_type = "in_channel"
return {'text': title, 'response_type': response_type, 'attachments':[{'text': attachment_body}]}
def equals_ignore_case(left, right):
try:
return left.lower() == right.lower()
except AttributeError:
return left == right
def is_ip_address(value):
try:
socket.inet_aton(value)
return True
except:
return False
def is_asn(value):
try:
value = int(value)
if 0 < value and value <= 2**32:
return True
else:
return False
except:
return False
def dns_lookup(value, ephemeral):
title = "DNS Lookup - %s" % value
try:
if is_ip_address(value):
return slack_response(title, socket.gethostbyaddr(value)[0], ephemeral)
else:
return slack_response(title, socket.gethostbyname(value), ephemeral)
except:
error_msg = "Error resolving <%s>" % value
return slack_response(title, error_msg, True)
def get_arin_ip_info(value, ephemeral):
title = "ARIN IP Lookup - %s" % value
if is_ip_address(value):
try:
url = "http://whois.arin.net/rest/ip/%s" % value
req = urllib2.Request(url)
req.add_header('Accept', 'application/json')
response = urllib2.urlopen(req)
data = json.loads(response.read())["net"]
try: org_name = "IP Owner: " + data["orgRef"]["@name"]
except Exception: org_name = "IP Owner: " + data["customerRef"]["@name"]
rest_url = "ARIN Record: " + data["ref"]["$"]
try:
cidr_ip = data["startAddress"]["$"]
cidr_length = data["netBlocks"]["netBlock"]["cidrLength"]["$"]
cidr = "Netblock: %s/%s" % (cidr_ip, cidr_length)
except Exception:
# Fix Later. Some records (e.g. 13.104.0.0) list multiple blocks.
cidr = "N/A"
attachment_body = "%s\n%s\n%s" % (org_name, cidr, rest_url)
return slack_response(title, attachment_body, ephemeral)
except urllib2.URLError as url_exception:
if hasattr(url_exception, 'reason'):
error_msg = "Unable to connect to ARIN server: <%s>" % url_exception.reason
elif hasattr(url_exception, 'code'):
error_msg = "Error received from ARIN server: <%s>" % url_exception.code
return slack_response(title, error_msg, True)
except Exception as generic_exception:
error_msg = "Error retrieving data from ARIN: %s" % generic_exception.message
return slack_response(title, error_msg, True)
else:
error_msg = "<%s> is not a valid IP address" % value
return slack_response(title, error_msg, True)
def get_arin_asn_info(value, ephemeral):
# Strip Leading ASN if Provided
if value[0:2].lower() == "as":
value = value[2:]
title = "ARIN ASN Lookup - AS%s" % value
if is_asn(value):
try:
url = "http://whois.arin.net/rest/asn/%s" % value
req = urllib2.Request(url)
req.add_header('Accept', 'application/json')
response = urllib2.urlopen(req)
data = json.loads(response.read())["asn"]
try: org_name = "ASN Owner: " + data["orgRef"]["@name"]
except Exception: org_name = "ASN Owner: " + data["customerRef"]["@name"]
rest_url = "ARIN Record: " + data["ref"]["$"]
name = data["name"]["$"]
attachment_body = "%s\nASN Name: %s\n%s" % (org_name, name, rest_url)
return slack_response(title, attachment_body, ephemeral)
except urllib2.URLError as url_exception:
if hasattr(url_exception, 'reason'):
error_msg = "Unable to connect to ARIN server: <%s>" % url_exception.reason
elif hasattr(url_exception, 'code'):
error_msg = "Error received from ARIN server: <%s>" % url_exception.code
return slack_response(title, error_msg, True)
except Exception as generic_exception:
error_msg = "Error retrieving data from ARIN: %s" % generic_exception.message
return slack_response(title, error_msg, True)
else:
error_msg = "<%s> is not a valid ASN" % value
return slack_response(title, error_msg, True)
def lambda_handler(event, context):
params = parse_qs(event['body'])
token = params['token'][0]
if token != expected_token:
logger.error("Request token (%s) does not match expected", token)
return respond(Exception('Invalid request token'))
user = params['user_name'][0]
command = params['command'][0]
channel = params['channel_name'][0]
try:
command_text = params['text'][0]
except KeyError:
return respond(None, slack_response("Invalid Syntax", "Parameter required for the %s command!" % command, True))
ephemeral = True
# Determine if response is ephemeral and set boolean accordingly
if command_text[0:1] == "#":
ephemeral = False
command_text = command_text[1:]
if equals_ignore_case(command, '/dns'):
return respond(None, dns_lookup(command_text, ephemeral))
elif equals_ignore_case(command, '/isp'):
return respond(None, get_arin_ip_info(command_text, ephemeral))
elif equals_ignore_case(command, '/asn'):
return respond(None, get_arin_asn_info(command_text, ephemeral))
else:
return respond(None, "%s invoked unsupported command <%s> in <%s> with the following text: %s" % (user, command, channel, command_text))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment