Skip to content

Instantly share code, notes, and snippets.

@shafayeatsumit
Created December 26, 2017 12:50
Show Gist options
  • Save shafayeatsumit/36b072b028c71aa11a29d7193dcc4dbf to your computer and use it in GitHub Desktop.
Save shafayeatsumit/36b072b028c71aa11a29d7193dcc4dbf to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import os
import subprocess
import zipfile
import sys
import getopt
import shutil
"""
Script will create an AWS Lambda function deployment.
It expects there to be a deployments directory and it will create a
deployment of the form:
deployment_n
where n is incremented for each deployment based on the existing deployment
directories
If the AWS Lambda function has dependencies those dependencies are expected
to be in the requirements-test2.txt file.
The implementation files are expected to be in the root project directory, and
this command does not currently support deeply nested file structures.
"""
root_deployments_dir = './deployments'
root_project_dir = 'C:/Users/MEMORY SISTEMAS/PycharmProjects/aws_web_apps'
# List of files that should be included in the deployment
# Only the files listed here, and the libraries in the requirements-test2.txt
# file will be included in the deployment.
deployment_files = []#'email_test.py', 'package.py']
def _read_test_requirements():
filename = os.path.join(root_project_dir, "requirements-test2.txt")
if not os.path.exists(filename):
print("Requirements file doesn't exist")
return None
with open(filename, 'r') as f:
install_requirements = f.readlines()
return install_requirements
def _read_requirements():
filename = os.path.join(root_project_dir, "requirements-test2.txt")
if not os.path.exists(filename):
print("WARNING: A 'requirements-test2.txt' file was not found and at a minimum it is expected that pyalexa-skill and its dependents would be in there.")
print("run: pip freeze > requirements-test2.txt to generate this file.")
return None
with open(filename, 'r') as f:
install_requirements = f.readlines()
return install_requirements
def _get_immediate_subdirectories(a_dir):
return [name for name in os.listdir(a_dir)
if os.path.isdir(os.path.join(a_dir, name))]
def _mkdirp(directory):
if not os.path.isdir(directory):
os.makedirs(directory)
def _make_deployment_dir():
_mkdirp(root_deployments_dir)
all_deployment_directories = _get_immediate_subdirectories(root_deployments_dir)
max_deployment_number = -1
for deployment_dir in all_deployment_directories:
dir_name_elements = deployment_dir.split("_")
if len(dir_name_elements) == 2:
if int(dir_name_elements[1]) > max_deployment_number:
max_deployment_number = int(dir_name_elements[1])
if max_deployment_number == -1:
max_deployment_number = 0
deployment_name = "deployment_{0}".format(max_deployment_number + 1)
new_deployment_dir_path = os.path.join(root_deployments_dir, deployment_name)
if not os.path.exists(new_deployment_dir_path):
os.mkdir(new_deployment_dir_path)
return (new_deployment_dir_path, deployment_name)
def _install_test_requirements(deployment_requirements, deployment_dir):
"""
pip install -i https://testpypi.python.org/pypi <requirements line> -t <deployment_dir>
:param deployment_requirements
:param deployment_dir:
:return:
"""
if os.path.exists(deployment_dir):
for requirement in deployment_requirements:
if not requirement.startswith('#'):
cmd = "pip install -i https://testpypi.python.org/pypi {0} -t {1}".format(requirement, deployment_dir).split()
return_code = subprocess.call(cmd, shell=False)
def _install_requirements(deployment_requirements, deployment_dir):
"""
pip install <requirements line> -t <deployment_dir>
:param deployment_requirements
:param deployment_dir:
:return:
"""
if os.path.exists(deployment_dir):
for requirement in deployment_requirements:
if not requirement.startswith('#'):
cmd = "pip install {0} -t {1}".format(requirement, deployment_dir).split()
return_code = subprocess.call(cmd, shell=False)
def _copy_deployment_files(deployment_data):
for item in deployment_data:
#print('deployment_data:', deployment_data)
#print('item:', item)
try:
if os.path.exists(item['from']):
shutil.copy2(item['from'], item['to'])
# cmd = "cp {0} {1}".format(item['from'], item['to']).split()
# return_code = subprocess.call(cmd, shell=False)
else:
raise NameError("Deployment file not found [{0}]".format(item['from']))
except:
pass
def zipdir(dirPath=None, zipFilePath=None, includeDirInZip=False):
"""
Keyword arguments:
dirPath -- string path to the directory to archive. This is the only
required argument. It can be absolute or relative, but only one or zero
leading directories will be included in the zip archive.
zipFilePath -- string path to the output zip file. This can be an absolute
or relative path. If the zip file already exists, it will be updated. If
not, it will be created. If you want to replace it from scratch, delete it
prior to calling this function. (default is computed as dirPath + ".zip")
includeDirInZip -- boolean indicating whether the top level directory should
be included in the archive or omitted. (default True)
"""
if not zipFilePath:
zipFilePath = dirPath + ".zip"
if not os.path.isdir(dirPath):
raise OSError("dirPath argument must point to a directory. "
"'%s' does not." % dirPath)
parentDir, dirToZip = os.path.split(dirPath)
# Little nested function to prepare the proper archive path
def trimPath(path):
archivePath = path.replace(parentDir, "", 1)
if parentDir:
archivePath = archivePath.replace(os.path.sep, "", 1)
if not includeDirInZip:
archivePath = archivePath.replace(dirToZip + os.path.sep, "", 1)
return os.path.normcase(archivePath)
outFile = zipfile.ZipFile(zipFilePath, "w",
compression=zipfile.ZIP_DEFLATED)
for (archiveDirPath, dirNames, fileNames) in os.walk(dirPath):
for fileName in fileNames:
filePath = os.path.join(archiveDirPath, fileName)
outFile.write(filePath, trimPath(filePath))
# Make sure we get empty directories as well
if not fileNames and not dirNames:
zipInfo = zipfile.ZipInfo(trimPath(archiveDirPath) + "/")
# some web sites suggest doing
# zipInfo.external_attr = 16
# or
# zipInfo.external_attr = 48
# Here to allow for inserting an empty directory. Still TBD/TODO.
outFile.writestr(zipInfo, "")
outFile.close()
def make_target_dirs(target_paths):
for dirname in set(os.path.dirname(p) for p in target_paths):
if not os.path.isdir(dirname):
os.makedirs(dirname)
def main(argv):
global root_deployments_dir, root_project_dir
include_files = 'email_test.py, package.py'
try:
opts, args = getopt.getopt(argv, "hr:i:", ["root=", "include="])
except getopt.GetoptError:
print('create_aws_lambda.py -r <root project dir> -i <include files>')
print('if -r option not supplied it will look for PWD environment variable')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print('create_aws_lambda.py -r <root project dir> -i <include files>')
print('if -r option not supplied it will look for PWD environment variable')
print('<include files> are relative to root project dir')
sys.exit()
elif opt in ("-r", "--root"):
root_project_dir = arg
elif opt in ("-i", "--include"):
# incase the options are surrounded by quotes
include_files = arg.replace("'",'')
if not root_project_dir:
root_project_dir = os.environ.get("PWD")
if root_project_dir is None:
root_project_dir = os.getcwd()
if root_project_dir is None:
raise ValueError("Must supply -r or --root option")
if not include_files:
raise ValueError("Must supply -i or --include option")
root_deployments_dir = os.path.join(root_project_dir, 'deployments') #"{0}/deployments".format(root_project_dir)
(deployment_dir, deployment_name) = _make_deployment_dir()
for include_file in include_files.split(","):
d = {
"from": os.path.join(root_project_dir, include_file.strip()),
"to": os.path.join(deployment_dir, include_file.strip())
}
make_target_dirs([d['to']])
deployment_files.append(d)
_copy_deployment_files(deployment_files)
# standard requirements file
install_requirements = _read_requirements()
if install_requirements:
_install_requirements(install_requirements, deployment_dir)
# test requirements file (requirements-test2.txt)
install_requirements = _read_test_requirements()
if install_requirements:
_install_test_requirements(install_requirements, deployment_dir)
deployment_num_file_name = os.path.join(root_deployments_dir, '.deployment_number.txt') #"{0}/.deployment_number.txt".format(root_deployments_dir)
if not os.path.isfile(deployment_num_file_name):
# first time - be safe and start at deployment 100
with open(deployment_num_file_name, 'w') as f:
f.write("100\n")
next_deployment_number = None
with open(deployment_num_file_name, 'r+') as f:
line1 = f.readline()
try:
next_deployment_number = int(line1.strip()) + 1
except:
next_deployment_number = 1
deployment_dir_path = os.path.join(root_deployments_dir, "deployment_{0}".format(next_deployment_number))
deployment_zip_path = os.path.join(root_deployments_dir, "deployment_{0}.zip".format(next_deployment_number))
while os.path.exists(deployment_dir_path) or os.path.exists(deployment_zip_path):
next_deployment_number = next_deployment_number + 1
f.seek(0)
f.write("{0}\n".format(next_deployment_number))
deployment_zip_filename = os.path.join(root_deployments_dir, "deployment_{0}.zip".format(next_deployment_number))
zipdir(deployment_dir, deployment_zip_filename)
shutil.rmtree(deployment_dir, ignore_errors=True)
print("Created deployment zip file: {0}".format(deployment_zip_filename))
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment