Skip to content

Instantly share code, notes, and snippets.

@pauloapi
Last active October 10, 2022 17:08
Show Gist options
  • Save pauloapi/abed07138690d37d170a6cf15b40d749 to your computer and use it in GitHub Desktop.
Save pauloapi/abed07138690d37d170a6cf15b40d749 to your computer and use it in GitHub Desktop.
AWS Lambda: Determine Event Source from event object. Based on this javascript gist https://gist.github.com/jeshan/52cb021fd20d871c56ad5ce6d2654d7b
def get_lambda_event_source(self, event: dict):
if 'pathParameters' in event and 'proxy' in event['pathParameters']:
return 'api_gateway_aws_proxy'
if 'requestContext' in event and 'resourceId' in event['requestContext']:
return 'api_gateway_http'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:s3':
return 's3'
elif 'Records' in event and len(event['Records']) > 0 and 'EventSource' in event['Records'][0] and event['Records'][0]['EventSource'] == 'aws:sns':
return 'sns'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:dynamodb':
return 'dynamo_db'
elif 'Records' in event and len(event['Records']) > 0 and 'cf' in event['Records'][0]:
return 'cloudfront'
elif 'source' in event and event['source'] == 'aws.events':
return 'scheduled_event'
elif 'awslogs' in event and 'data' in event['awslogs']:
return 'cloud_watch_logs'
elif 'authorizationToken' in event and event['authorizationToken'] == "incoming-client-token":
return 'api_gateway_authorizer'
elif 'configRuleId' in event and 'configRuleName' in event and 'configRuleArn' in event:
return 'aws_config'
elif 'StackId' in event and 'RequestType' in event and 'ResourceType' in event:
return 'cloud_formation'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:codecommit':
return 'code_commit'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:ses':
return 'ses'
elif 'Records' in event and len(event['Records']) > 0 and 'eventSource' in event['Records'][0] and event['Records'][0]['eventSource'] == 'aws:kinesis':
return 'kinesis'
elif 'records' in event and len(event['Records']) > 0 and 'approximateArrivalTimestamp' in event['records'][0]:
return 'kinesis_firehose'
elif 'records' in event and len(event['Records']) > 0 and 'deliveryStreamArn' in event and event['deliveryStreamArn'] is str and event['deliveryStreamArn'].startswith('arn:aws:kinesis:'):
return 'kinesis_firehose'
elif 'eventType' in event and event['eventType'] == 'SyncTrigger' and 'identityId' in event and 'identityPoolId' in event:
return 'cognito_sync_trigger'
elif 'operation' in event and 'message' in event:
return 'is_mobile_backend'
@gpmilliken
Copy link

You seem to be missing SQS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment