Last active
September 14, 2018 22:57
-
-
Save antimatter15/cf5902bb596d6ab8fef3920ff4603643 to your computer and use it in GitHub Desktop.
Jupyter Magic to Invoke Cell as AWS Lambda
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FUNCTION_NAME = 'parallel_lambda' | |
LAMBDA_ROLE = 'arn:aws:iam::972882471061:role/lambda_exec_role' | |
DEFAULT_MEMORY = 128 | |
DEFAULT_TIMEOUT = 30 | |
AWS_PROFILE = 'paralambda' | |
NUM_THREADS = 1000 | |
import boto3 | |
import subprocess | |
import json | |
import sys | |
import os | |
import zipfile | |
import hashlib | |
import base64 | |
import shutil | |
import argparse | |
import threading | |
from concurrent.futures import ThreadPoolExecutor | |
from IPython.core.magic import register_cell_magic | |
from tqdm import tqdm_notebook as tqdm | |
import pickle | |
import zlib | |
import io | |
import tempfile | |
import inspect | |
threadLocal = threading.local() | |
executor = ThreadPoolExecutor(max_workers=NUM_THREADS) | |
session = boto3.Session(profile_name=AWS_PROFILE) | |
lambdaClient = session.client('lambda') | |
storedLambdas = {} | |
@register_cell_magic | |
def lambdu(line, cell): | |
parser = argparse.ArgumentParser( | |
prog='lambdu', | |
description='Jupyter cell magic to invoke cell on AWS Lambda') | |
parser.add_argument('deps', type=str, nargs='*', | |
help='dependencies to be installed via PyPI') | |
parser.add_argument('--memory', default=DEFAULT_MEMORY, type=int, | |
help='amount of memory in 64MB increments from 128 up to 3008') | |
parser.add_argument('--timeout', default=DEFAULT_TIMEOUT, type=int, | |
help='lambda execution timeout in seconds up to 300 (5 minutes)') | |
parser.add_argument('--no_install', action='store_true', | |
help='do not install dependencies if not found') | |
parser.add_argument('--clean_all', action='store_true', | |
help='remove all deployed dependencies') | |
parser.add_argument('--rm', action='store_true', | |
help='remove a specific') | |
parser.add_argument('--reinstall', action='store_true', | |
help='uninstall and reinstall') | |
parser.add_argument('--runtime', type=str, default='python3.6', | |
help='which runtime (python3.6, python2.7)') | |
parser.add_argument('-n', type=int, default=1, | |
help='number of lambdas to invoke') | |
parser.add_argument('--verbose', action='store_true', | |
help='show additional information from lambda invocation') | |
parser.add_argument('--name', type=str, | |
help='name to store this lambda as') | |
args = parser.parse_args(line.split(' ')) | |
deps = [ x for x in args.deps if x ] | |
box_config = { | |
'requirements': deps, | |
'memory': args.memory, | |
'timeout': args.timeout, | |
'runtime': args.runtime | |
} | |
alias = make_alias_name(box_config) | |
if args.clean_all: | |
remove_all_aliases() | |
return | |
if args.rm or args.reinstall: | |
ali = lambdaClient.get_alias(FunctionName=FUNCTION_NAME, Name=alias) | |
lambdaClient.delete_alias(FunctionName=FUNCTION_NAME, Name=ali['Name']) | |
lambdaClient.delete_function(FunctionName=FUNCTION_NAME, | |
Qualifier=ali['FunctionVersion']) | |
print('Deleted alias "%s".' % alias) | |
if args.rm: | |
return | |
if not args.no_install and not lambda_exists(FUNCTION_NAME, alias): | |
ensure_deps(box_config) | |
print(("-" * 100) + "\n") | |
run_config = { | |
'alias': alias, | |
'code': cell, | |
'verbose': args.verbose | |
} | |
if args.name: | |
storedLambdas[args.name] = run_config | |
return None | |
if args.n > 1: | |
return invoke(run_config, args.n) | |
else: | |
return invoke(run_config) | |
def install_lambda_deps(path, box_config): | |
requirements = box_config['requirements'] | |
runtime = box_config['runtime'] | |
if len(requirements) == 0: | |
print("No dependencies to install...") | |
return | |
proc = subprocess.Popen( | |
['docker', 'run', '-t', '--rm', | |
'-v', path + ':' + '/var/task', | |
'--entrypoint', '/var/lang/bin/pip', 'lambci/lambda:build-' + runtime, | |
'install', '--progress-bar=off', '-t', '/var/task'] + requirements, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
for line in proc.stdout: | |
line = line.decode('utf-8') | |
if 'Downloading ' not in line and line.strip() != '': | |
print(line.strip()) | |
for line in proc.stderr: | |
print(line, end='') | |
# TODO: consider using https://github.com/ipython/ipython/blob/ | |
# master/IPython/core/interactiveshell.py | |
# Based on: https://stackoverflow.com/a/47130538 | |
LAMBDA_TEMPLATE = """ | |
import os | |
import sys | |
import ast | |
import json | |
import pprint | |
import pickle | |
import base64 | |
import zlib | |
sys.path.append(os.path.join(os.path.dirname(__file__), 'python_lambda_deps')) | |
def is_json_serializable(out): | |
try: | |
json.dumps(out) | |
except TypeError: | |
return False | |
return True | |
def pickle_serialize(out): | |
try: | |
data = base64.b64encode(zlib.compress(pickle.dumps(out))).decode('utf-8') | |
if len(data) > 5 * 1024 * 1024: | |
print('WARN/LAMBDU: Pickle serialization exceeds 5MB, not returning result.') | |
return (False, None) | |
return (True, data) | |
except Exception: | |
return (False, None) | |
def lambda_handler(event, context): | |
globalenv = { | |
'INDEX': event['index'], | |
'DATA': pickle.loads(zlib.decompress(base64.b64decode(event['zpickle64']))) | |
} | |
result = my_exec(event['code'], globalenv, globalenv) | |
output = { | |
'machine': os.environ['AWS_LAMBDA_LOG_STREAM_NAME'], | |
'pretty': pprint.pformat(result, indent=4), | |
} | |
pickle_serializable, pickle_data = pickle_serialize(result) | |
if pickle_serializable: | |
output['zpickle64'] = pickle_data | |
return output | |
def my_exec(script, globals=None, locals=None): | |
'''Execute a script and return the value of the last expression''' | |
stmts = list(ast.iter_child_nodes(ast.parse(script))) | |
if not stmts: | |
return None | |
if isinstance(stmts[-1], ast.Expr): | |
# the last one is an expression and we will try to return the results | |
# so we first execute the previous statements | |
if len(stmts) > 1: | |
exec(compile(ast.Module(body=stmts[:-1]), | |
filename="<ast>", mode="exec"), globals, locals) | |
# then we eval the last one | |
return eval(compile(ast.Expression(body=stmts[-1].value), | |
filename="<ast>", mode="eval"), globals, locals) | |
else: | |
# otherwise we just execute the entire code | |
return exec(script, globals, locals) | |
""" | |
def zipdir(ziph, path, realpath): | |
for root, dirs, files in os.walk(realpath): | |
for file in files: | |
ziph.write(os.path.join(root, file), | |
os.path.normpath(os.path.join(path, os.path.relpath(root, realpath), file))) | |
def zipstr(ziph, path, contents): | |
info = zipfile.ZipInfo(path) | |
info.external_attr = 0o555 << 16 | |
ziph.writestr(info, contents) | |
def build_lambda_package(dep_path): | |
pseudofile = io.BytesIO() | |
zipf = zipfile.ZipFile(pseudofile, 'w', zipfile.ZIP_DEFLATED) | |
zipdir(zipf, 'python_lambda_deps/', dep_path) | |
zipstr(zipf, 'main.py', LAMBDA_TEMPLATE) | |
zipf.close() | |
return pseudofile.getvalue() | |
def create_or_update_alias(version, alias): | |
try: | |
return lambdaClient.update_alias( | |
FunctionName=FUNCTION_NAME, Name=alias, FunctionVersion=version) | |
except lambdaClient.exceptions.ResourceNotFoundException: | |
return lambdaClient.create_alias( | |
FunctionName=FUNCTION_NAME, Name=alias, FunctionVersion=version) | |
def create_or_update_lambda(zipfile, box_config): | |
runtime = box_config['runtime'] | |
memory = box_config['memory'] | |
timeout = box_config['timeout'] | |
try: | |
lambdaClient.update_function_configuration( | |
FunctionName=FUNCTION_NAME, | |
Timeout=timeout, | |
Runtime=runtime, | |
MemorySize=memory | |
) | |
return lambdaClient.update_function_code( | |
FunctionName=FUNCTION_NAME, | |
ZipFile=zipfile, | |
Publish=True | |
) | |
except lambdaClient.exceptions.ResourceNotFoundException: | |
return lambdaClient.create_function( | |
FunctionName=FUNCTION_NAME, | |
Runtime=runtime, | |
MemorySize=memory, | |
Timeout=timeout, | |
Code={ | |
'ZipFile': zipfile | |
}, | |
Handler='main.lambda_handler', | |
Publish=True, | |
Role=LAMBDA_ROLE, | |
Description='Parallel Lambda Worker' | |
) | |
def human_size(bytes, units=[' bytes','KB','MB','GB','TB', 'PB', 'EB']): | |
""" Returns a human readable string reprentation of bytes""" | |
return str(bytes) + units[0] if bytes < 1024 else human_size(bytes>>10, units[1:]) | |
def ensure_deps(box_config): | |
alias = make_alias_name(box_config) | |
if lambda_exists(FUNCTION_NAME, alias): | |
print("Alias '%s' already exists." % alias) | |
return | |
# path = tempfile.mkdtemp('python_lambda_deps') | |
# os.chmod(path, 0o777) | |
path = os.path.join(os.getcwd(), 'temp_lambda_deps') | |
if os.path.exists(path): | |
shutil.rmtree(path) | |
print("Installing %d dependencies..." % len(box_config['requirements'])) | |
install_lambda_deps(path, box_config) | |
print("Building Lambda package...") | |
package_contents = build_lambda_package(path) | |
if os.path.exists(path): | |
shutil.rmtree(path) | |
print("Uploading package to AWS (%s)..." % human_size(len(package_contents))) | |
result = create_or_update_lambda(package_contents, box_config) | |
create_or_update_alias(result['Version'], alias) | |
print("Successfully deployed as '%s'." % alias) | |
def remove_all_aliases(): | |
aliases = lambdaClient.list_aliases(FunctionName=FUNCTION_NAME)['Aliases'] | |
versions = lambdaClient.list_versions_by_function( | |
FunctionName=FUNCTION_NAME)['Versions'] | |
for ali in aliases: | |
lambdaClient.delete_alias(FunctionName=FUNCTION_NAME, Name=ali['Name']) | |
for ver in versions: | |
if ver['Version'] == '$LATEST': continue | |
lambdaClient.delete_function(FunctionName=FUNCTION_NAME, Qualifier=ver['Version']) | |
print("Removed %d aliases, and %d versions" % (len(aliases), len(versions) - 1)) | |
def make_alias_name(box_config): | |
requirements = sorted([x.lower() for x in set(box_config['requirements'])]) | |
h = hashlib.sha256(b'1') | |
for req in requirements: h.update(req.encode('utf-8')) | |
reqs = '_'.join(requirements)[:100] | |
if reqs == '': reqs = 'NONE' | |
return ('%s-%dM-%ds-%s-' % ( | |
box_config['runtime'].replace('.', ''), | |
box_config['memory'], | |
box_config['timeout'], | |
h.hexdigest()[:5])) + reqs | |
def lambda_exists(name, alias): | |
try: | |
lambdaClient.invoke(FunctionName=name, InvocationType="DryRun", Qualifier=alias) | |
except lambdaClient.exceptions.ResourceNotFoundException: | |
return False | |
return True | |
def invokeThread(info): | |
if not hasattr(threadLocal, 'client'): | |
session = boto3.session.Session(profile_name=AWS_PROFILE) | |
threadLocal.client = session.client('lambda') | |
result = threadLocal.client.invoke( | |
FunctionName=FUNCTION_NAME, | |
InvocationType='RequestResponse', | |
LogType='Tail', | |
Payload=info['payload'], | |
Qualifier=info['alias'] | |
) | |
data = json.load(result['Payload']) | |
# 'ALIAS: %s (%s)\n' % (info['alias'], data['machine']), | |
for line in base64.b64decode(result['LogResult']).decode('utf-8').split('\n')[:-1]: | |
is_aws = line.startswith('START ') or line.startswith('END ') or line.startswith('REPORT ') | |
if (not is_aws) or (info['verbose']): | |
print(line) | |
if data is not None: | |
if 'zpickle64' in data: | |
return pickle.loads(zlib.decompress(base64.b64decode(data['zpickle64']))) | |
elif 'pretty' in data: | |
return data['pretty'] | |
else: | |
return data | |
def invoke(config, data=1): | |
tasks = [] | |
if isinstance(data, int): | |
data = range(data) | |
count = len(data) | |
for i, data in enumerate(data): | |
tasks.append({ | |
'alias': config['alias'], | |
'verbose': config.get('verbose', False), | |
'payload': json.dumps({ | |
'code': config['code'], | |
'index': i, | |
'zpickle64': base64.b64encode(zlib.compress(pickle.dumps(data))).decode('utf-8') | |
}) | |
}) | |
if count == 1: | |
return invokeThread(tasks[0]) | |
else: | |
return list(tqdm(executor.map(invokeThread, tasks), total=count)) | |
def map(fname, data): | |
return invoke(storedLambdas[fname], data) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment