Last active
January 29, 2020 16:15
-
-
Save jbcurtin/5e5a6924f95de22f8eee6d60cfa593f7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import boto3 | |
import logging | |
import os | |
import shutil | |
import zipfile | |
from botocore.errorfactory import ClientError | |
logger = logging.getLogger('') | |
sysHandler = logging.StreamHandler() | |
sysHandler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) | |
logger.addHandler(sysHandler) | |
logger.setLevel(logging.INFO) | |
logger = logging.getLogger(__file__) | |
WORKING_DIR = os.path.join(os.getcwd(), 'build') | |
ARCHIVE_PATH = os.path.join(WORKING_DIR, 'boto3-example.zip') | |
BUILD_DIR = os.path.join(WORKING_DIR, 'boto3-example') | |
S3_KEY = 'lambda_codes.zip' | |
S3_BUCKET = 'lambda-function-sandbox' | |
FUNCTION_NAME = 'lambda-boto3-example' | |
FUNCTION_RUNTIME = 'python3.8' | |
FUNCTION_ROLE_ARN = 'arn:aws:iam::***:role/role-name' | |
def generate_zip(): | |
logger.info(f'Generating Archive[{ARCHIVE_PATH}]') | |
compression_method = zipfile.ZIP_STORED | |
if os.path.exists(BUILD_DIR): | |
shutil.rmtree(BUILD_DIR) | |
os.makedirs(BUILD_DIR) | |
lambda_handler_path = os.path.join(BUILD_DIR, 'lambda_handler.py') | |
with open(lambda_handler_path, 'w') as stream: | |
stream.write('''#!/usr/bin/env python | |
def handler(event=None, context=None): | |
print("Hello Lambda") | |
''') | |
with zipfile.ZipFile(ARCHIVE_PATH, 'w', compression_method) as archive: | |
for root, dirs, files in os.walk(BUILD_DIR): | |
for filename in files: | |
if filename.endswith('.pyc'): | |
continue | |
if filename == '__pycache__': | |
continue | |
logger.info(f'Adding file[{filename}] to Archive') | |
abs_filename = os.path.join(root, filename) | |
if filename.endswith('.py'): | |
os.chmod(abs_filename, 0o755) | |
relative_filepath = os.path.join(root.replace(BUILD_DIR, '').lstrip(os.sep), filename) | |
zip_info = zipfile.ZipInfo(relative_filepath) | |
zip_info.create_system = 3 | |
zip_info.external_attr = 0o755 << int(16) | |
with open(abs_filename, 'rb') as file_stream: | |
archive.writestr(zip_info, file_stream.read(), compression_method) | |
for dirname in dirs: | |
continue | |
def upload_s3_object(): | |
s3_client = boto3.client('s3') | |
logger.info(f'Uploading S3Archive[{ARCHIVE_PATH}] to S3[{S3_KEY}]') | |
s3_client.upload_file(ARCHIVE_PATH, S3_BUCKET, S3_KEY) | |
def rebuild_lambda(): | |
lambda_client = boto3.client('lambda') | |
try: | |
lambda_client.get_function(FunctionName=FUNCTION_NAME) | |
except (ClientError, lambda_client.exceptions.ResourceNotFoundException) as err: | |
logger.info(f'Creating Lambda Function[{FUNCTION_NAME}]') | |
details = lambda_client.create_function( | |
FunctionName=FUNCTION_NAME, | |
Runtime=FUNCTION_RUNTIME, | |
MemorySize=256, | |
Role=FUNCTION_ROLE_ARN, | |
Handler='lambda_handler.handler', | |
Code = { | |
'S3Bucket': S3_BUCKET, | |
'S3Key': S3_KEY, | |
}, | |
Timeout=900, # 15 minutes | |
Environment={ | |
'Variables': {} # ENVVars | |
}, | |
Layers=[]) | |
else: | |
logger.info(f'Deleting Lambda Function[{FUNCTION_NAME}]') | |
lambda_client.delete_function(FunctionName=FUNCTION_NAME) | |
rebuild_lambda() | |
generate_zip() | |
upload_s3_object() | |
rebuild_lambda() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment