Last active
September 30, 2020 23:58
-
-
Save unacceptable/452d184ccacc42398d5d3ff2328cf8a2 to your computer and use it in GitHub Desktop.
Replay s3 event triggers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Written by: Robert J. | |
# Email: [email protected] | |
import os | |
import sys | |
import logging | |
import json | |
import boto3 | |
from datetime import date, datetime | |
import re | |
####################################### | |
### Logging Settings ################## | |
####################################### | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
####################################### | |
### Global Variables ################## | |
####################################### | |
S3 = boto3.client('s3') | |
LAMBDA = boto3.client('lambda') | |
FUNCTION_NAME = os.environ.get('FUNCTION_NAME', 'my_function') | |
KEY_REGEX = os.environ.get('KEY_REGEX', None) | |
DRY_RUN = bool(os.environ.get('DRY_RUN', False)) | |
####################################### | |
### Main Function ##################### | |
####################################### | |
def main(event=None, context=None): | |
logger.info('Starting Script') | |
error_count = 0 | |
logger.info('DRY_RUN: {}'.format(DRY_RUN)) | |
if DRY_RUN: | |
invocation_type='DryRun' | |
else: | |
invocation_type='Event' | |
for record in event["Records"]: | |
bucket = record["s3"]["bucket"]["name"] | |
keys = get_keys(bucket) | |
mock_events = create_mock_events(bucket, keys) | |
for mock_event in mock_events: | |
error_count += parse_mock_event(mock_event, invocation_type=invocation_type) | |
error_logic(error_count) | |
####################################### | |
### Generic Functions ################# | |
####################################### | |
def fatal(message, code=1): | |
logger.critical(message) | |
logger.info('Exiting Application') | |
sys.exit(code) | |
def configure_logging(stdout=False, filename=None, log_format=None): | |
if not log_format: | |
log_format="%(asctime)s - %(levelname)s - %(message)s" | |
formatter = logging.Formatter(log_format) | |
if stdout: | |
s_handler = logging.StreamHandler(sys.stdout) | |
s_handler.setFormatter(formatter) | |
logger.addHandler(s_handler) | |
logger.debug('Configured logging to stdout.') | |
if filename: | |
try: | |
f_handler = logging.FileHandler(filename) | |
except PermissionError as e: | |
fatal(e) | |
f_handler.setFormatter(formatter) | |
logger.addHandler(f_handler) | |
logger.debug('Configured logging for {}'.format(filename)) | |
def custom_json_parser(value): | |
if isinstance(value, (datetime, date)): | |
return value.isoformat() | |
raise TypeError ('Type "{}" not serializable'.format(type(value))) | |
def error_logic(error_count): | |
logger.info('Total Errors: {}'.format(error_count)) | |
if error_count > 255: | |
error_count = 255 | |
logger.info('Exiting Script') | |
# exit if error_count (lambda doesn't like sys.exit(0)) | |
if error_count > 0: | |
sys.exit(error_count) | |
####################################### | |
### Program Specific Functions ######## | |
####################################### | |
def get_keys(bucket): | |
all_keys = [] | |
response = { | |
'NextContinuationToken': 'asdf' | |
} | |
while 'NextContinuationToken' in response.keys(): | |
if response['NextContinuationToken'] != 'asdf': | |
logger.info('Continuation token found.') | |
response = S3.list_objects_v2( | |
Bucket=bucket, | |
ContinuationToken=response['NextContinuationToken'] | |
) | |
else: | |
logger.info('Retrieving list of objects from {}.'.format(bucket)) | |
response = S3.list_objects_v2( | |
Bucket=bucket | |
) | |
keys = [ | |
r['Key'] for r in response["Contents"] | |
] | |
if KEY_REGEX: | |
keys = [ | |
key for key in keys if re.search(KEY_REGEX, key) | |
] | |
all_keys += keys | |
logger.debug('S3 Objects: {}'.format(all_keys)) | |
return all_keys | |
def create_mock_events(bucket, keys): | |
mock_events = [ | |
{ | |
"Records": [ | |
{ | |
"s3": { | |
"bucket": { | |
"name": bucket | |
}, | |
"object": { | |
"key": key | |
} | |
} | |
} | |
] | |
} for key in keys | |
] | |
logger.debug('Mock events: {}'.format(json.dumps(mock_events, indent=5))) | |
return mock_events | |
def parse_mock_event(mock_event, invocation_type='DryRun'): | |
key = mock_event['Records'][0]['s3']['object']['key'] | |
payload = bytes( | |
json.dumps(mock_event), | |
encoding='utf-8' | |
) | |
logger.info('Parsing {}'.format(key)) | |
logger.debug('Invoking {}'.format(FUNCTION_NAME)) | |
logger.debug('Payload {}'.format(payload)) | |
try: | |
LAMBDA.invoke( | |
FunctionName=FUNCTION_NAME, | |
InvocationType=invocation_type, | |
Payload=payload | |
) | |
except KeyboardInterrupt as e: | |
print('\n') | |
fatal('Received Keyboard Interrupt {}'.format(e)) | |
except LAMBDA.exceptions.ResourceNotFoundException as e: | |
fatal(e) | |
except: | |
logger.error('Unable to invoke lambda for payload {}.'.format(key)) | |
return 1 | |
return 0 | |
####################################### | |
### Execution ######################### | |
####################################### | |
if __name__ == '__main__': | |
configure_logging(stdout=True) | |
event = { | |
"Records": [ | |
{ | |
"s3": { | |
"bucket": { | |
"name": "first-bucket-to-process", | |
}, | |
"object": { | |
"key": "", | |
} | |
} | |
}, | |
{ | |
"s3": { | |
"bucket": { | |
"name": "second-bucket-to-process", | |
}, | |
"object": { | |
"key": "", | |
} | |
} | |
} | |
] | |
} | |
try: | |
main(event=event) | |
except KeyboardInterrupt as e: | |
fatal('Evaluating Keyboard interrupt: {}'.format(e)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment