Skip to content

Instantly share code, notes, and snippets.

@allenmichael
Last active January 8, 2023 22:22
Show Gist options
  • Save allenmichael/b15bee68acf8ecfdfcc409b32a4e4ce0 to your computer and use it in GitHub Desktop.
Save allenmichael/b15bee68acf8ecfdfcc409b32a4e4ce0 to your computer and use it in GitHub Desktop.

Streaming API Gateway input to Kinesis Firehose to S3

This CloudFormation template creates an API Gateway that will pass all JSON data sent as a PUT request to Kinesis Firehose which in turn deposits the data into S3.

Retrieve the API Gateway URL after CloudFormation deployment

aws cloudformation describe-stacks --stack-name <STACK_NAME> --query Stacks[].Outputs[].[OutputKey,OutputValue]

Test the endpoint

curl -X PUT -H 'Content-Type: application/json' -d '{"event":"sent"}' https://xd8tc7fim2.execute-api.us-east-1.amazonaws.com/prod

Template with added data processor

The second CloudFormation adds a data processing Lambda. The sample Lambda code doesn't alter the data and is just for demonstration purposes.

exports.lambdaHandler = async (event, context) => {
console.log(event.records);
const output = event.records.map((record) => ({
recordId: record.recordId,
result: 'Ok',
data: record.data,
}));
console.log(`Processing completed. Successful records ${output.length}.`);
return { records: output };
};
{
"name": "firehose-processor",
"version": "1.0.0",
"description": "",
"main": "app.js",
"keywords": [],
"author": "",
"license": "Apache-2.0"
}
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Resources:
# A Kinesis Firehose Delivery Stream that provides
# real-time record ingestion and storage of records in Amazon S3.
FireHoseToS3:
DependsOn:
- FirehoseDeliveryPolicy
Type: "AWS::KinesisFirehose::DeliveryStream"
Properties:
ExtendedS3DestinationConfiguration:
BucketARN: !Join
- ""
- - "arn:aws:s3:::"
- !Ref DataDestinationBucket
BufferingHints:
IntervalInSeconds: "60"
SizeInMBs: "50"
CompressionFormat: UNCOMPRESSED
Prefix: firehose/
RoleARN: !GetAtt FirehoseDeliveryRole.Arn
# An Amazon S3 bucket where the records are stored.
DataDestinationBucket:
Type: "AWS::S3::Bucket"
Properties:
VersioningConfiguration:
Status: Enabled
# The IAM role for the above Firehose Delivery Stream.
FirehoseDeliveryRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Sid: ""
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: "sts:AssumeRole"
Condition:
StringEquals:
"sts:ExternalId": !Ref "AWS::AccountId"
# An IAM policy for the above IAM Role which gives our Delivery Stream
# the ability to deliver records to the created S3 bucket.
FirehoseDeliveryPolicy:
Type: "AWS::IAM::Policy"
DependsOn:
- DataDestinationBucket
Properties:
PolicyName: firehose_delivery_policy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "s3:AbortMultipartUpload"
- "s3:GetBucketLocation"
- "s3:GetObject"
- "s3:ListBucket"
- "s3:ListBucketMultipartUploads"
- "s3:PutObject"
Resource:
- !Join
- ""
- - "arn:aws:s3:::"
- !Ref DataDestinationBucket
- !Join
- ""
- - "arn:aws:s3:::"
- !Ref DataDestinationBucket
- "*"
Roles:
- !Ref FirehoseDeliveryRole
# An API Gateway execution role for the below REST API.
ProcessingApiRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Sid: ""
Effect: Allow
Principal:
Service: apigateway.amazonaws.com
Action: "sts:AssumeRole"
# An IAM policy that permits our created streaming API service to integrate
# with the Kinesis Firehose API PutRecord as an AWS Service Proxy.
ProcessingApiPolicy:
Type: "AWS::IAM::Policy"
DependsOn:
- ProcessingApiRole
Properties:
PolicyName: api_gateway_firehose_proxy_role
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "firehose:PutRecord"
Resource: !GetAtt FireHoseToS3.Arn
Roles:
- !Ref ProcessingApiRole
# A new REST API that acts as an AWS Service proxy to the Kinesis Firehose PutRecord API.
ProcessingApi:
Type: AWS::Serverless::Api
DependsOn:
- FireHoseToS3
- ProcessingApiRole
Properties:
EndpointConfiguration: REGIONAL
StageName: prod
DefinitionBody:
swagger: 2.0
info:
title:
Ref: AWS::StackName
paths:
"/":
put:
consumes:
- "application/json"
produces:
- "application/json"
responses:
"200":
statusCode: 200
x-amazon-apigateway-integration:
responses:
default:
statusCode: 200
credentials: !GetAtt ProcessingApiRole.Arn
connectionType: INTERNET
httpMethod: POST
type: AWS
uri:
!Join [
"",
[
"arn:aws:apigateway:",
{ "Ref": "AWS::Region" },
":firehose:action/PutRecord",
],
]
# The below requestTemplate transforms the incoming JSON
# payload into the request object structure that
# the Kinesis Firehose PutRecord API requires.
passthroughBehavior: WHEN_NO_TEMPLATES
requestTemplates:
application/json:
!Join
[
"",
[
"{ \"DeliveryStreamName\": \"",
!Ref FireHoseToS3,
"\", \"Record\": {\"Data\": \"$util.base64Encode($input.json('$'))\" } }"
]
]
requestParameters:
integration.request.header.Content-Type: "'application/x-amz-json-1.1'"
Outputs:
StreamingApiEndpoint:
Description: The endpoint for the REST API created with API Gateway
Value:
!Join [
"",
[
"https://",
!Ref "ProcessingApi",
".execute-api.",
!Ref "AWS::Region",
".amazonaws.com/prod",
],
]
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Resources:
FireHoseToS3:
DependsOn:
- FirehoseDeliveryPolicy
- DataProcessor
Type: "AWS::KinesisFirehose::DeliveryStream"
Properties:
ExtendedS3DestinationConfiguration:
BucketARN: !Join
- ""
- - "arn:aws:s3:::"
- !Ref DataDestinationBucket
BufferingHints:
IntervalInSeconds: "60"
SizeInMBs: "50"
CompressionFormat: UNCOMPRESSED
Prefix: firehose/
RoleARN: !GetAtt FirehoseDeliveryRole.Arn
ProcessingConfiguration:
Enabled: "true"
Processors:
- Parameters:
- ParameterName: LambdaArn
ParameterValue: !GetAtt DataProcessor.Arn
Type: Lambda
DataDestinationBucket:
Type: "AWS::S3::Bucket"
Properties:
VersioningConfiguration:
Status: Enabled
FirehoseDeliveryRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Sid: ""
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: "sts:AssumeRole"
Condition:
StringEquals:
"sts:ExternalId": !Ref "AWS::AccountId"
FirehoseDeliveryPolicy:
Type: "AWS::IAM::Policy"
DependsOn:
- DataProcessor
- DataDestinationBucket
Properties:
PolicyName: firehose_delivery_policy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "s3:AbortMultipartUpload"
- "s3:GetBucketLocation"
- "s3:GetObject"
- "s3:ListBucket"
- "s3:ListBucketMultipartUploads"
- "s3:PutObject"
Resource:
- !Join
- ""
- - "arn:aws:s3:::"
- !Ref DataDestinationBucket
- !Join
- ""
- - "arn:aws:s3:::"
- !Ref DataDestinationBucket
- "*"
- Effect: Allow
Action:
- "lambda:InvokeFunction"
Resource: !GetAtt DataProcessor.Arn
Roles:
- !Ref FirehoseDeliveryRole
DataProcessor:
Type: AWS::Serverless::Function
Properties:
CodeUri: .
Handler: app.lambdaHandler
Runtime: nodejs8.10
DataProcessorPermission:
Type: "AWS::Lambda::Permission"
Properties:
Action: "lambda:InvokeFunction"
FunctionName: !Ref DataProcessor
Principal: firehose.amazonaws.com
SourceAccount: !Ref AWS::AccountId
SourceArn: !GetAtt FireHoseToS3.Arn
ProcessingApiRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Sid: ""
Effect: Allow
Principal:
Service: apigateway.amazonaws.com
Action: "sts:AssumeRole"
ProcessingApiPolicy:
Type: "AWS::IAM::Policy"
DependsOn:
- ProcessingApiRole
Properties:
PolicyName: api_gateway_firehose_proxy_role
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "firehose:PutRecord"
Resource: !GetAtt FireHoseToS3.Arn
Roles:
- !Ref ProcessingApiRole
ProcessingApi:
Type: AWS::Serverless::Api
DependsOn:
- FireHoseToS3
- ProcessingApiRole
Properties:
EndpointConfiguration: REGIONAL
StageName: prod
DefinitionBody:
swagger: 2.0
info:
title:
Ref: AWS::StackName
paths:
"/":
put:
consumes:
- "application/json"
produces:
- "application/json"
responses:
"200":
statusCode: 200
x-amazon-apigateway-integration:
responses:
default:
statusCode: 200
credentials: !GetAtt ProcessingApiRole.Arn
connectionType: INTERNET
httpMethod: POST
type: AWS
uri:
!Join [
"",
[
"arn:aws:apigateway:",
{ "Ref": "AWS::Region" },
":firehose:action/PutRecord",
],
]
passthroughBehavior: WHEN_NO_TEMPLATES
requestTemplates:
application/json:
!Join [
"",
[
'{ "DeliveryStreamName": "',
!Ref FireHoseToS3,
'", "Record": {"Data": "$util.base64Encode($input.json(''$''))"}}',
],
]
requestParameters:
integration.request.header.Content-Type: "'application/x-amz-json-1.1'"
Outputs:
StreamingApiEndpoint:
Description: The endpoint for the REST API created with API Gateway
Value:
!Join [
"",
[
"https://",
!Ref "ProcessingApi",
".execute-api.",
!Ref "AWS::Region",
".amazonaws.com/prod",
],
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment