Created
April 30, 2020 17:20
-
-
Save rafasoares/c53ccc021f89dc8e0b33870e287c2e16 to your computer and use it in GitHub Desktop.
Lambda@Edge function to add IPs requesting known exploitable URLs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import logging | |
import os | |
from base64 import b64decode | |
LOCAL_IPS = ['::1', '127.0.0.1', '0.0.0.0'] | |
ENCRYPTED_ACL_ID = os.environ['encryptedAclId'] | |
KNOWN_BAD_PATHS = os.environ['knownBadPaths'].split(',') | |
ACL_ID = boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_ACL_ID))['Plaintext'].decode('utf-8') | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
ec2 = boto3.resource('ec2') | |
acl = ec2.NetworkAcl(ACL_ID) | |
all_entries = [entry for entry in acl.entries if entry['Egress'] == False and 1100 <= entry['RuleNumber'] <= 2000] | |
for entry in all_entries[:-5]: | |
acl.delete_entry(Egress=False, RuleNumber=entry['RuleNumber']) | |
entries = [entry for entry in acl.entries if entry['Egress'] == False and 1100 <= entry['RuleNumber'] <= 2000] | |
numbers = [entry['RuleNumber'] for entry in entries] | |
number = next(i for i, e in enumerate(numbers + [None], 1100) if i != e) | |
def lambda_handler(event, context): | |
logger.info("Event: " + str(event)) | |
message = json.loads(event['Records'][0]['Sns']['Message']) | |
logger.info("Message: " + str(message)) | |
path = message['Path'] | |
ip = message['IpAddress'] | |
if ip == '::1': | |
logger.info("Not going to block a local IP, aborting") | |
return | |
cidr = f"{ip}/32" | |
if any(entry['CidrBlock'] == cidr for entry in entries): | |
logger.info(f"IP {ip} already blocked, skipping") | |
return | |
logger.info(f"Checking request {path} for IP {ip}") | |
if path.lower().startswith(tuple(KNOWN_BAD_PATHS)): | |
logger.info(f"Request {path} is a known path for possible attackers, blocking") | |
acl.create_entry( | |
CidrBlock=cidr, | |
Egress=False, | |
Protocol="-1", | |
RuleAction='deny', | |
RuleNumber=number | |
) | |
else: | |
logger.info(f"Request {info} is not a known bad path, skipping") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment