Created
April 1, 2020 11:59
-
-
Save mgd020/2f66d8ce49f2f29add50db82960abc71 to your computer and use it in GitHub Desktop.
Lambda wrapper for wsgi app
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import cgi | |
import codecs | |
import io | |
import sys | |
from urllib.parse import urlencode | |
from wsgiref.util import FileWrapper | |
from .wsgi import application | |
def lambda_handler(event, context): | |
# headers and params | |
try: | |
query_params = event.pop('multiValueQueryStringParameters') | |
headers = combine_headers( | |
(format_header(name), value) for name, values in event.pop('multiValueHeaders').items() for value in values | |
) | |
doseq = True | |
except KeyError: | |
query_params = event.pop('queryStringParameters') | |
headers = {format_header(name): value for name, value in event.pop('headers').items()} | |
doseq = False | |
# body content | |
try: | |
request_body = event.pop('body') | |
except KeyError: | |
request_body = b'' | |
else: | |
if event.pop('isBase64Encoded', False): | |
request_body = base64.b64decode(request_body) | |
else: | |
encoding = get_encoding(headers) | |
request_body = request_body.encode(encoding) | |
environ = { | |
'REQUEST_METHOD': event.pop('httpMethod'), | |
'PATH_INFO': event.pop('path'), | |
'QUERY_STRING': urlencode(query_params, doseq=doseq), | |
'SERVER_NAME': '1.0.0.127.in-addr.arpa', | |
'SERVER_PORT': headers['HTTP_X_FORWARDED_PORT'], | |
'SERVER_PROTOCOL': 'HTTP/1.1', | |
'REMOTE_ADDR': headers['HTTP_X_FORWARDED_FOR'].split(',')[0], | |
'wsgi.version': (1, 0), | |
'wsgi.url_scheme': headers['HTTP_X_FORWARDED_PROTO'], | |
'wsgi.input': io.BytesIO(request_body), | |
'wsgi.errors': sys.stderr, | |
'wsgi.multithread': True, | |
'wsgi.multiprocess': False, | |
'wsgi.run_once': False, | |
'wsgi.file_wrapper': FileWrapper, | |
'aws.lambda.event': event, | |
'aws.lambda.context': context, | |
} | |
environ.update(headers) | |
response = {} | |
response_body = io.BytesIO() | |
def start_response(status, response_headers, exc_info=None): | |
response['statusCode'] = int(status.split(' ', 1)[0]) | |
headers = combine_headers(response_headers) | |
if doseq: | |
response['multiValueHeaders'] = {name: [value] for name, value in headers.items()} | |
else: | |
response['headers'] = headers | |
return response_body.write | |
result = application(environ, start_response) | |
try: | |
for data in result: | |
response_body.write(data) | |
finally: | |
if hasattr(result, 'close'): | |
result.close() | |
response_body = response_body.getvalue() | |
if doseq: | |
content_type = (response['multiValueHeaders'].get('Content-Type') or [''])[0] | |
else: | |
content_type = response['headers'].get('Content-Type', '') | |
content_type = cgi.parse_header(content_type)[0].lower() | |
if content_type.startswith('text/') or ( | |
content_type in ('application/json', 'application/javascript', 'application/xml') | |
): | |
response['body'] = response_body.decode() | |
else: | |
response['body'] = base64.b64encode(response_body) | |
response['isBase64Encoded'] = True | |
return response | |
def format_header(string): | |
string = string.upper().replace('-', '_') | |
if string in ('CONTENT_TYPE', 'CONTENT_LENGTH'): | |
return string | |
return f'HTTP_{string}' | |
def get_encoding(headers): | |
try: | |
encoding = cgi.parse_header(headers['CONTENT_TYPE'])[1]['charset'] | |
codecs.lookup(encoding) | |
return encoding | |
except (KeyError, LookupError): | |
return 'utf-8' | |
def combine_headers(header_tuples): | |
headers = {} | |
for name, value in header_tuples: | |
try: | |
headers[name] += f',{value}' | |
except KeyError: | |
headers[name] = value | |
return headers |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment