Skip to content

Instantly share code, notes, and snippets.

@luketn
Last active December 17, 2024 16:08
Show Gist options
  • Save luketn/f976954a9002f659ba9a746ca6edd005 to your computer and use it in GitHub Desktop.
Save luketn/f976954a9002f659ba9a746ca6edd005 to your computer and use it in GitHub Desktop.
A cloud formation template (and CDK stack) to create a Kinesis stream and Firehose delivering to S3
Resources:
Stream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 2
Name: request-logging
RetentionPeriodHours: 24
StreamEncryption:
Fn::If:
- AwsCdkKinesisEncryptedStreamsUnsupportedRegions
- Ref: AWS::NoValue
- EncryptionType: KMS
KeyId: alias/aws/kinesis
Tags:
- Key: Name
Value: examples-dev-v1-stream
RequestLoggingS3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: ifm-examples-request-logging
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
UpdateReplacePolicy: Retain
DeletionPolicy: Retain
RequestLoggingDeliveryStreamLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/kinesisfirehose/request-logging
RetentionInDays: 14
UpdateReplacePolicy: Retain
DeletionPolicy: Retain
RequestLoggingDeliveryStreamLogStream:
Type: AWS::Logs::LogStream
Properties:
LogGroupName:
Ref: RequestLoggingDeliveryStreamLogGroup
LogStreamName: S3Delivery
UpdateReplacePolicy: Retain
DeletionPolicy: Retain
RequestLoggingDeliveryStreamRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Action: sts:AssumeRole
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Version: "2012-10-17"
Policies:
- PolicyDocument:
Statement:
- Action:
- kinesis:DescribeStream
- kinesis:DescribeStreamSummary
- kinesis:GetRecords
- kinesis:GetShardIterator
- kinesis:ListShards
- kinesis:SubscribeToShard
Effect: Allow
Resource:
Fn::GetAtt:
- Stream
- Arn
- Action:
- s3:GetObject*
- s3:GetBucket*
- s3:List*
- s3:DeleteObject*
- s3:PutObject*
- s3:Abort*
Effect: Allow
Resource:
- Fn::GetAtt:
- RequestLoggingS3Bucket
- Arn
- Fn::Join:
- ""
- - Fn::GetAtt:
- RequestLoggingS3Bucket
- Arn
- /*
- Action: logs:PutLogEvents
Effect: Allow
Resource:
Fn::GetAtt:
- RequestLoggingDeliveryStreamLogGroup
- Arn
Version: "2012-10-17"
PolicyName: allow-s3-kinesis-logs
Tags:
- Key: Name
Value: examples-dev-v1-requestloggingdeliverystreamrole
RequestLoggingDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: request-logging
DeliveryStreamType: KinesisStreamAsSource
KinesisStreamSourceConfiguration:
KinesisStreamARN:
Fn::GetAtt:
- Stream
- Arn
RoleARN:
Fn::GetAtt:
- RequestLoggingDeliveryStreamRole
- Arn
S3DestinationConfiguration:
BucketARN:
Fn::GetAtt:
- RequestLoggingS3Bucket
- Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 10
CloudWatchLoggingOptions:
Enabled: true
LogGroupName:
Ref: RequestLoggingDeliveryStreamLogGroup
LogStreamName:
Ref: RequestLoggingDeliveryStreamLogStream
CompressionFormat: GZIP
ErrorOutputPrefix: errors/
Prefix: data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
RoleARN:
Fn::GetAtt:
- RequestLoggingDeliveryStreamRole
- Arn
Conditions:
AwsCdkKinesisEncryptedStreamsUnsupportedRegions:
Fn::Or:
- Fn::Equals:
- Ref: AWS::Region
- cn-north-1
- Fn::Equals:
- Ref: AWS::Region
- cn-northwest-1
import {Construct, RemovalPolicy, Stack, StackProps} from '@aws-cdk/core';
import {Stream} from "@aws-cdk/aws-kinesis";
import {BlockPublicAccess, Bucket} from "@aws-cdk/aws-s3";
import * as logs from "@aws-cdk/aws-logs";
import {RetentionDays} from "@aws-cdk/aws-logs";
import {CfnDeliveryStream} from "@aws-cdk/aws-kinesisfirehose";
import {Effect, PolicyDocument, PolicyStatement, Role, ServicePrincipal} from "@aws-cdk/aws-iam";
export interface KinesisStreamDefinition {
name: string;
/**
* Shard count is a critical property for Kinesis Streams.
*
* Whilst it is important to estimate the amount of data / second you might need upfront,
* this can be increased or decreased later.
*
* See the documentation to work out how many shards are appropriate for your data size:
* number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048)
* https://docs.aws.amazon.com/streams/latest/dev/amazon-kinesis-streams.html#how-do-i-size-a-stream
*/
shardCount: number;
s3Destinations?: KinesisStreamS3Destination[];
}
export interface KinesisStreamS3Destination {
/** ID for the S3 Bucket Resource */
id: string;
bucketName: string;
firehoseName: string;
/** Defaults to 300s (5 minutes) */
intervalInSeconds?: number;
/** Defaults to 10 */
sizeInMBs?: number;
removalPolicy?: RemovalPolicy;
}
export interface KinesisStreamStackConfig {
/** The stack ID for the Kinesis Stream. */
id: string;
/** The stream details */
definition: KinesisStreamDefinition;
}
/**
* A kinesis stream, optionally with an S3 Firehose destination(s), which can be used across the account for cross-product purposes.
*/
export class KinesisStreamStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps, definition: KinesisStreamDefinition) {
super(scope, id, props);
const stream = new Stream(this, "Stream", {
streamName: definition.name,
shardCount: definition.shardCount,
});
if (definition.s3Destinations) {
for (const s3Destination of definition.s3Destinations) {
const bucket = new Bucket(this, `${s3Destination.id}S3Bucket`, {
bucketName: s3Destination.bucketName,
removalPolicy: s3Destination.removalPolicy || RemovalPolicy.RETAIN,
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
});
/**
* A CloudWatch log group and stream written to when there are failures experienced by the delivery stream.
*/
const logGroup = new logs.LogGroup(this, `${s3Destination.id}DeliveryStreamLogGroup`, {
logGroupName: `/aws/kinesisfirehose/${s3Destination.firehoseName}`,
retention: RetentionDays.TWO_WEEKS,
removalPolicy: RemovalPolicy.RETAIN
});
const logStream = new logs.LogStream(this, `${s3Destination.id}DeliveryStreamLogStream`, {
logGroup: logGroup,
logStreamName: 'S3Delivery',
removalPolicy: RemovalPolicy.RETAIN
});
const deliveryStreamRole = new Role(this, `${s3Destination.id}DeliveryStreamRole`, {
assumedBy: new ServicePrincipal('firehose.amazonaws.com'),
inlinePolicies: {
'allow-s3-kinesis-logs': new PolicyDocument({
statements: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: [
"kinesis:DescribeStream",
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:SubscribeToShard"
],
resources: [stream.streamArn]
}),
new PolicyStatement({
effect: Effect.ALLOW,
actions: [
"s3:GetObject*",
"s3:GetBucket*",
"s3:List*",
"s3:DeleteObject*",
"s3:PutObject*",
"s3:Abort*"
],
resources: [
bucket.bucketArn,
bucket.bucketArn + "/*"
]
}),
new PolicyStatement({
effect: Effect.ALLOW,
actions: [
"logs:PutLogEvents"
],
resources: [
logGroup.logGroupArn
]
})
]
})
}
});
//Note the Hive style partitioning used in the prefix. Allows auto-generated partitions in Athena with MCSK REPAIR TABLE table
//Ref: https://aws.amazon.com/premiumsupport/knowledge-center/athena-create-use-partitioned-tables/
let firehose = new CfnDeliveryStream(this, `${s3Destination.id}DeliveryStream`, {
deliveryStreamName: `${s3Destination.firehoseName}`,
deliveryStreamType: 'KinesisStreamAsSource',
s3DestinationConfiguration: {
bucketArn: bucket.bucketArn,
roleArn: deliveryStreamRole.roleArn,
bufferingHints: {
intervalInSeconds: s3Destination.intervalInSeconds,
sizeInMBs: s3Destination.sizeInMBs
},
compressionFormat: 'GZIP',
prefix: 'data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/',
errorOutputPrefix: 'errors/',
cloudWatchLoggingOptions: {
enabled: true,
logGroupName: logGroup.logGroupName,
logStreamName: logStream.logStreamName
}
},
kinesisStreamSourceConfiguration: {
kinesisStreamArn: stream.streamArn,
roleArn: deliveryStreamRole.roleArn
},
});
}
}
}
}
cdk synth KinesisStreamRequestLoggingV1 --path-metadata false --version-reporting false > KinesisStreamRequestLoggingV1.yml
aws kinesis put-record --region ap-southeast-2 --stream-name request-logging --data "eyJ1c2VyIjogImJvYiJ9Cg==" --partition-key 123
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment