Last active
December 17, 2024 16:08
-
-
Save luketn/f976954a9002f659ba9a746ca6edd005 to your computer and use it in GitHub Desktop.
A cloud formation template (and CDK stack) to create a Kinesis stream and Firehose delivering to S3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Resources: | |
Stream: | |
Type: AWS::Kinesis::Stream | |
Properties: | |
ShardCount: 2 | |
Name: request-logging | |
RetentionPeriodHours: 24 | |
StreamEncryption: | |
Fn::If: | |
- AwsCdkKinesisEncryptedStreamsUnsupportedRegions | |
- Ref: AWS::NoValue | |
- EncryptionType: KMS | |
KeyId: alias/aws/kinesis | |
Tags: | |
- Key: Name | |
Value: examples-dev-v1-stream | |
RequestLoggingS3Bucket: | |
Type: AWS::S3::Bucket | |
Properties: | |
BucketName: ifm-examples-request-logging | |
PublicAccessBlockConfiguration: | |
BlockPublicAcls: true | |
BlockPublicPolicy: true | |
IgnorePublicAcls: true | |
RestrictPublicBuckets: true | |
UpdateReplacePolicy: Retain | |
DeletionPolicy: Retain | |
RequestLoggingDeliveryStreamLogGroup: | |
Type: AWS::Logs::LogGroup | |
Properties: | |
LogGroupName: /aws/kinesisfirehose/request-logging | |
RetentionInDays: 14 | |
UpdateReplacePolicy: Retain | |
DeletionPolicy: Retain | |
RequestLoggingDeliveryStreamLogStream: | |
Type: AWS::Logs::LogStream | |
Properties: | |
LogGroupName: | |
Ref: RequestLoggingDeliveryStreamLogGroup | |
LogStreamName: S3Delivery | |
UpdateReplacePolicy: Retain | |
DeletionPolicy: Retain | |
RequestLoggingDeliveryStreamRole: | |
Type: AWS::IAM::Role | |
Properties: | |
AssumeRolePolicyDocument: | |
Statement: | |
- Action: sts:AssumeRole | |
Effect: Allow | |
Principal: | |
Service: firehose.amazonaws.com | |
Version: "2012-10-17" | |
Policies: | |
- PolicyDocument: | |
Statement: | |
- Action: | |
- kinesis:DescribeStream | |
- kinesis:DescribeStreamSummary | |
- kinesis:GetRecords | |
- kinesis:GetShardIterator | |
- kinesis:ListShards | |
- kinesis:SubscribeToShard | |
Effect: Allow | |
Resource: | |
Fn::GetAtt: | |
- Stream | |
- Arn | |
- Action: | |
- s3:GetObject* | |
- s3:GetBucket* | |
- s3:List* | |
- s3:DeleteObject* | |
- s3:PutObject* | |
- s3:Abort* | |
Effect: Allow | |
Resource: | |
- Fn::GetAtt: | |
- RequestLoggingS3Bucket | |
- Arn | |
- Fn::Join: | |
- "" | |
- - Fn::GetAtt: | |
- RequestLoggingS3Bucket | |
- Arn | |
- /* | |
- Action: logs:PutLogEvents | |
Effect: Allow | |
Resource: | |
Fn::GetAtt: | |
- RequestLoggingDeliveryStreamLogGroup | |
- Arn | |
Version: "2012-10-17" | |
PolicyName: allow-s3-kinesis-logs | |
Tags: | |
- Key: Name | |
Value: examples-dev-v1-requestloggingdeliverystreamrole | |
RequestLoggingDeliveryStream: | |
Type: AWS::KinesisFirehose::DeliveryStream | |
Properties: | |
DeliveryStreamName: request-logging | |
DeliveryStreamType: KinesisStreamAsSource | |
KinesisStreamSourceConfiguration: | |
KinesisStreamARN: | |
Fn::GetAtt: | |
- Stream | |
- Arn | |
RoleARN: | |
Fn::GetAtt: | |
- RequestLoggingDeliveryStreamRole | |
- Arn | |
S3DestinationConfiguration: | |
BucketARN: | |
Fn::GetAtt: | |
- RequestLoggingS3Bucket | |
- Arn | |
BufferingHints: | |
IntervalInSeconds: 60 | |
SizeInMBs: 10 | |
CloudWatchLoggingOptions: | |
Enabled: true | |
LogGroupName: | |
Ref: RequestLoggingDeliveryStreamLogGroup | |
LogStreamName: | |
Ref: RequestLoggingDeliveryStreamLogStream | |
CompressionFormat: GZIP | |
ErrorOutputPrefix: errors/ | |
Prefix: data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/ | |
RoleARN: | |
Fn::GetAtt: | |
- RequestLoggingDeliveryStreamRole | |
- Arn | |
Conditions: | |
AwsCdkKinesisEncryptedStreamsUnsupportedRegions: | |
Fn::Or: | |
- Fn::Equals: | |
- Ref: AWS::Region | |
- cn-north-1 | |
- Fn::Equals: | |
- Ref: AWS::Region | |
- cn-northwest-1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {Construct, RemovalPolicy, Stack, StackProps} from '@aws-cdk/core'; | |
import {Stream} from "@aws-cdk/aws-kinesis"; | |
import {BlockPublicAccess, Bucket} from "@aws-cdk/aws-s3"; | |
import * as logs from "@aws-cdk/aws-logs"; | |
import {RetentionDays} from "@aws-cdk/aws-logs"; | |
import {CfnDeliveryStream} from "@aws-cdk/aws-kinesisfirehose"; | |
import {Effect, PolicyDocument, PolicyStatement, Role, ServicePrincipal} from "@aws-cdk/aws-iam"; | |
export interface KinesisStreamDefinition { | |
name: string; | |
/** | |
* Shard count is a critical property for Kinesis Streams. | |
* | |
* Whilst it is important to estimate the amount of data / second you might need upfront, | |
* this can be increased or decreased later. | |
* | |
* See the documentation to work out how many shards are appropriate for your data size: | |
* number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048) | |
* https://docs.aws.amazon.com/streams/latest/dev/amazon-kinesis-streams.html#how-do-i-size-a-stream | |
*/ | |
shardCount: number; | |
s3Destinations?: KinesisStreamS3Destination[]; | |
} | |
export interface KinesisStreamS3Destination { | |
/** ID for the S3 Bucket Resource */ | |
id: string; | |
bucketName: string; | |
firehoseName: string; | |
/** Defaults to 300s (5 minutes) */ | |
intervalInSeconds?: number; | |
/** Defaults to 10 */ | |
sizeInMBs?: number; | |
removalPolicy?: RemovalPolicy; | |
} | |
export interface KinesisStreamStackConfig { | |
/** The stack ID for the Kinesis Stream. */ | |
id: string; | |
/** The stream details */ | |
definition: KinesisStreamDefinition; | |
} | |
/** | |
* A kinesis stream, optionally with an S3 Firehose destination(s), which can be used across the account for cross-product purposes. | |
*/ | |
export class KinesisStreamStack extends Stack { | |
constructor(scope: Construct, id: string, props: StackProps, definition: KinesisStreamDefinition) { | |
super(scope, id, props); | |
const stream = new Stream(this, "Stream", { | |
streamName: definition.name, | |
shardCount: definition.shardCount, | |
}); | |
if (definition.s3Destinations) { | |
for (const s3Destination of definition.s3Destinations) { | |
const bucket = new Bucket(this, `${s3Destination.id}S3Bucket`, { | |
bucketName: s3Destination.bucketName, | |
removalPolicy: s3Destination.removalPolicy || RemovalPolicy.RETAIN, | |
blockPublicAccess: BlockPublicAccess.BLOCK_ALL, | |
}); | |
/** | |
* A CloudWatch log group and stream written to when there are failures experienced by the delivery stream. | |
*/ | |
const logGroup = new logs.LogGroup(this, `${s3Destination.id}DeliveryStreamLogGroup`, { | |
logGroupName: `/aws/kinesisfirehose/${s3Destination.firehoseName}`, | |
retention: RetentionDays.TWO_WEEKS, | |
removalPolicy: RemovalPolicy.RETAIN | |
}); | |
const logStream = new logs.LogStream(this, `${s3Destination.id}DeliveryStreamLogStream`, { | |
logGroup: logGroup, | |
logStreamName: 'S3Delivery', | |
removalPolicy: RemovalPolicy.RETAIN | |
}); | |
const deliveryStreamRole = new Role(this, `${s3Destination.id}DeliveryStreamRole`, { | |
assumedBy: new ServicePrincipal('firehose.amazonaws.com'), | |
inlinePolicies: { | |
'allow-s3-kinesis-logs': new PolicyDocument({ | |
statements: [ | |
new PolicyStatement({ | |
effect: Effect.ALLOW, | |
actions: [ | |
"kinesis:DescribeStream", | |
"kinesis:DescribeStreamSummary", | |
"kinesis:GetRecords", | |
"kinesis:GetShardIterator", | |
"kinesis:ListShards", | |
"kinesis:SubscribeToShard" | |
], | |
resources: [stream.streamArn] | |
}), | |
new PolicyStatement({ | |
effect: Effect.ALLOW, | |
actions: [ | |
"s3:GetObject*", | |
"s3:GetBucket*", | |
"s3:List*", | |
"s3:DeleteObject*", | |
"s3:PutObject*", | |
"s3:Abort*" | |
], | |
resources: [ | |
bucket.bucketArn, | |
bucket.bucketArn + "/*" | |
] | |
}), | |
new PolicyStatement({ | |
effect: Effect.ALLOW, | |
actions: [ | |
"logs:PutLogEvents" | |
], | |
resources: [ | |
logGroup.logGroupArn | |
] | |
}) | |
] | |
}) | |
} | |
}); | |
//Note the Hive style partitioning used in the prefix. Allows auto-generated partitions in Athena with MCSK REPAIR TABLE table | |
//Ref: https://aws.amazon.com/premiumsupport/knowledge-center/athena-create-use-partitioned-tables/ | |
let firehose = new CfnDeliveryStream(this, `${s3Destination.id}DeliveryStream`, { | |
deliveryStreamName: `${s3Destination.firehoseName}`, | |
deliveryStreamType: 'KinesisStreamAsSource', | |
s3DestinationConfiguration: { | |
bucketArn: bucket.bucketArn, | |
roleArn: deliveryStreamRole.roleArn, | |
bufferingHints: { | |
intervalInSeconds: s3Destination.intervalInSeconds, | |
sizeInMBs: s3Destination.sizeInMBs | |
}, | |
compressionFormat: 'GZIP', | |
prefix: 'data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/', | |
errorOutputPrefix: 'errors/', | |
cloudWatchLoggingOptions: { | |
enabled: true, | |
logGroupName: logGroup.logGroupName, | |
logStreamName: logStream.logStreamName | |
} | |
}, | |
kinesisStreamSourceConfiguration: { | |
kinesisStreamArn: stream.streamArn, | |
roleArn: deliveryStreamRole.roleArn | |
}, | |
}); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cdk synth KinesisStreamRequestLoggingV1 --path-metadata false --version-reporting false > KinesisStreamRequestLoggingV1.yml |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws kinesis put-record --region ap-southeast-2 --stream-name request-logging --data "eyJ1c2VyIjogImJvYiJ9Cg==" --partition-key 123 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment