Last active
May 1, 2017 23:46
-
-
Save kmix/62ab30271ed3164bdd3f60c435157453 to your computer and use it in GitHub Desktop.
Simple Network Slackbot (Deployed via AWS Lambda)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import logging | |
import os | |
import socket | |
import urllib2 | |
from base64 import b64decode | |
from urlparse import parse_qs | |
ENCRYPTED_EXPECTED_TOKEN = os.environ['kmsEncryptedToken'] | |
kms = boto3.client('kms') | |
expected_token = kms.decrypt(CiphertextBlob=b64decode(ENCRYPTED_EXPECTED_TOKEN))['Plaintext'] | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
def respond(err, res=None): | |
return { | |
'statusCode': '400' if err else '200', | |
'body': err.message if err else json.dumps(res), | |
'headers': { | |
'Content-Type': 'application/json', | |
}, | |
} | |
def slack_response(title, attachment_body, ephemeral): | |
if ephemeral: | |
response_type = "ephemeral" | |
else: | |
response_type = "in_channel" | |
return {'text': title, 'response_type': response_type, 'attachments':[{'text': attachment_body}]} | |
def equals_ignore_case(left, right): | |
try: | |
return left.lower() == right.lower() | |
except AttributeError: | |
return left == right | |
def is_ip_address(value): | |
try: | |
socket.inet_aton(value) | |
return True | |
except: | |
return False | |
def is_asn(value): | |
try: | |
value = int(value) | |
if 0 < value and value <= 2**32: | |
return True | |
else: | |
return False | |
except: | |
return False | |
def dns_lookup(value, ephemeral): | |
title = "DNS Lookup - %s" % value | |
try: | |
if is_ip_address(value): | |
return slack_response(title, socket.gethostbyaddr(value)[0], ephemeral) | |
else: | |
return slack_response(title, socket.gethostbyname(value), ephemeral) | |
except: | |
error_msg = "Error resolving <%s>" % value | |
return slack_response(title, error_msg, True) | |
def get_arin_ip_info(value, ephemeral): | |
title = "ARIN IP Lookup - %s" % value | |
if is_ip_address(value): | |
try: | |
url = "http://whois.arin.net/rest/ip/%s" % value | |
req = urllib2.Request(url) | |
req.add_header('Accept', 'application/json') | |
response = urllib2.urlopen(req) | |
data = json.loads(response.read())["net"] | |
try: org_name = "IP Owner: " + data["orgRef"]["@name"] | |
except Exception: org_name = "IP Owner: " + data["customerRef"]["@name"] | |
rest_url = "ARIN Record: " + data["ref"]["$"] | |
try: | |
cidr_ip = data["startAddress"]["$"] | |
cidr_length = data["netBlocks"]["netBlock"]["cidrLength"]["$"] | |
cidr = "Netblock: %s/%s" % (cidr_ip, cidr_length) | |
except Exception: | |
# Fix Later. Some records (e.g. 13.104.0.0) list multiple blocks. | |
cidr = "N/A" | |
attachment_body = "%s\n%s\n%s" % (org_name, cidr, rest_url) | |
return slack_response(title, attachment_body, ephemeral) | |
except urllib2.URLError as url_exception: | |
if hasattr(url_exception, 'reason'): | |
error_msg = "Unable to connect to ARIN server: <%s>" % url_exception.reason | |
elif hasattr(url_exception, 'code'): | |
error_msg = "Error received from ARIN server: <%s>" % url_exception.code | |
return slack_response(title, error_msg, True) | |
except Exception as generic_exception: | |
error_msg = "Error retrieving data from ARIN: %s" % generic_exception.message | |
return slack_response(title, error_msg, True) | |
else: | |
error_msg = "<%s> is not a valid IP address" % value | |
return slack_response(title, error_msg, True) | |
def get_arin_asn_info(value, ephemeral): | |
# Strip Leading ASN if Provided | |
if value[0:2].lower() == "as": | |
value = value[2:] | |
title = "ARIN ASN Lookup - AS%s" % value | |
if is_asn(value): | |
try: | |
url = "http://whois.arin.net/rest/asn/%s" % value | |
req = urllib2.Request(url) | |
req.add_header('Accept', 'application/json') | |
response = urllib2.urlopen(req) | |
data = json.loads(response.read())["asn"] | |
try: org_name = "ASN Owner: " + data["orgRef"]["@name"] | |
except Exception: org_name = "ASN Owner: " + data["customerRef"]["@name"] | |
rest_url = "ARIN Record: " + data["ref"]["$"] | |
name = data["name"]["$"] | |
attachment_body = "%s\nASN Name: %s\n%s" % (org_name, name, rest_url) | |
return slack_response(title, attachment_body, ephemeral) | |
except urllib2.URLError as url_exception: | |
if hasattr(url_exception, 'reason'): | |
error_msg = "Unable to connect to ARIN server: <%s>" % url_exception.reason | |
elif hasattr(url_exception, 'code'): | |
error_msg = "Error received from ARIN server: <%s>" % url_exception.code | |
return slack_response(title, error_msg, True) | |
except Exception as generic_exception: | |
error_msg = "Error retrieving data from ARIN: %s" % generic_exception.message | |
return slack_response(title, error_msg, True) | |
else: | |
error_msg = "<%s> is not a valid ASN" % value | |
return slack_response(title, error_msg, True) | |
def lambda_handler(event, context): | |
params = parse_qs(event['body']) | |
token = params['token'][0] | |
if token != expected_token: | |
logger.error("Request token (%s) does not match expected", token) | |
return respond(Exception('Invalid request token')) | |
user = params['user_name'][0] | |
command = params['command'][0] | |
channel = params['channel_name'][0] | |
try: | |
command_text = params['text'][0] | |
except KeyError: | |
return respond(None, slack_response("Invalid Syntax", "Parameter required for the %s command!" % command, True)) | |
ephemeral = True | |
# Determine if response is ephemeral and set boolean accordingly | |
if command_text[0:1] == "#": | |
ephemeral = False | |
command_text = command_text[1:] | |
if equals_ignore_case(command, '/dns'): | |
return respond(None, dns_lookup(command_text, ephemeral)) | |
elif equals_ignore_case(command, '/isp'): | |
return respond(None, get_arin_ip_info(command_text, ephemeral)) | |
elif equals_ignore_case(command, '/asn'): | |
return respond(None, get_arin_asn_info(command_text, ephemeral)) | |
else: | |
return respond(None, "%s invoked unsupported command <%s> in <%s> with the following text: %s" % (user, command, channel, command_text)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment