Last active
July 23, 2019 14:58
-
-
Save lee-hodg/9525a23707750c9d2aee0ab1cda61447 to your computer and use it in GitHub Desktop.
Lambda to async make requests
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import sys | |
import os | |
import logging | |
import requests | |
from boto3 import client as boto3_client | |
from requests_futures.sessions import FuturesSession | |
from raygun4py import raygunprovider | |
lambda_client = boto3_client('lambda') | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
environment = os.environ.get('ENVIRONMENT', 'Production') | |
# Use endpoint URL so that Lambda inside VPC can access SSM | |
ssm_client = boto3_client('ssm') | |
# First Raygun | |
try: | |
raygun_api_key = ssm_client.get_parameter(Name=f'/{environment}/Raygun/apikey', | |
WithDecryption=True)['Parameter']['Value'] | |
except Exception as exc: | |
logger.error(f'ERROR: Could not get raygun api key from SSM: {exc}') | |
sys.exit() | |
# Now handle unhandled exceptions with raygun | |
def handle_exception(exc_type, exc_value, exc_traceback): | |
raygun_client = raygunprovider.RaygunSender(raygun_api_key) | |
raygun_client.send_exception(exc_info=(exc_type, exc_value, exc_traceback)) | |
sys.excepthook = handle_exception | |
# Async requests | |
session = FuturesSession() | |
# Individual request time out | |
TIMEOUT = 10 | |
def lambda_handler(event, context): | |
""" | |
Consumes from the "requests" queue, and calls back to the `callback_lambda_name` lambda function with the responses | |
passing along any `extra_data` | |
""" | |
for record in event.get('Records', []): | |
try: | |
logger.info(record['body']) | |
json_record_body = json.loads(record['body']) | |
except (ValueError, KeyError) as exc: | |
logger.error(str(exc)) | |
rg_client = raygunprovider.RaygunSender(raygun_api_key) | |
rg_client.send_exception() | |
return {'statusCode': 500, 'error': str(exc)} | |
# Callback optional | |
callback_lambda_name = json_record_body.get('callback_lambda_name') | |
try: | |
# Each request should have a url, headers, method, payload | |
request_dicts = json_record_body['requests'] | |
except KeyError: | |
rg_client = raygunprovider.RaygunSender(raygun_api_key) | |
rg_client.send_exception() | |
return {'statusCode': 500, 'error': 'requests is required'} | |
# Optional | |
extra_data = json_record_body.get('extra_data', {}) | |
# Make requests asynchronously | |
responses = [] | |
for request_dict in request_dicts: | |
logger.info('Making async request to url {}'.format(request_dict['url'])) | |
try: | |
method = request_dict.get('method', 'GET') | |
url = request_dict['url'] | |
headers = request_dict['headers'] | |
payload = request_dict.get('payload') | |
except KeyError as kexc: | |
logger.error(str(kexc)) | |
rg_client = raygunprovider.RaygunSender(raygun_api_key) | |
rg_client.send_exception() | |
continue | |
responses.append(session.request(method, url, headers=headers, data=payload, timeout=TIMEOUT)) | |
# Process the results | |
response_payloads = [] | |
for response in responses: | |
try: | |
# This waits on the response to finish before continuing (other responses still downloading in bg) | |
result = response.result() | |
except requests.exceptions.RequestException as req_exc: | |
rg_client = raygunprovider.RaygunSender(raygun_api_key) | |
rg_client.send_exception() | |
logger.error(str(req_exc)) | |
continue | |
try: | |
body = result.json() | |
except ValueError: | |
body = result.text | |
logger.info(f'Response from {result.url} with status {result.status_code}') | |
try: | |
logger.info(f'Response from {result.url} has body {body}') | |
except Exception: | |
pass | |
try: | |
result.raise_for_status() | |
except Exception: | |
logger.error(f'Non 200 status response from {result.url}') | |
rg_client = raygunprovider.RaygunSender(raygun_api_key) | |
rg_client.send_exception() | |
continue | |
response_payloads.append({'status_code': result.status_code, | |
'url': result.url, | |
'headers': dict(result.headers), | |
'body': body}) | |
if len(response_payloads) > 0 and callback_lambda_name is not None: | |
payload = {'extra_data': extra_data, 'response_payloads': response_payloads} | |
logger.info('Calling back to {} with payload {}'.format(callback_lambda_name, | |
json.dumps(payload))) | |
lambda_client.invoke(FunctionName=callback_lambda_name, | |
InvocationType='Event', | |
Payload=json.dumps(payload)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment