Created
December 16, 2024 08:59
-
-
Save Tehnix/baf1372e43588152789f0271f20795d6 to your computer and use it in GitHub Desktop.
CDK for IoT Data Ingestion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as cdk from 'aws-cdk-lib'; | |
import * as sqs from 'aws-cdk-lib/aws-sqs'; | |
import { Construct } from 'constructs'; | |
interface StackProps extends cdk.StackProps { | |
/** | |
* The name of the queue. | |
*/ | |
readonly queueName: string; | |
/** | |
* Timeout of processing a single message. | |
* | |
* After dequeuing, the processor has this much time to handle the message and delete it from the queue | |
* before it becomes visible again for dequeueing by another processor. | |
* | |
* Values must be from 0 to 43200 seconds (12 hours). If you don't specify a value, AWS CloudFormation | |
* uses the default value of 30 seconds. | |
*/ | |
readonly visibilityTimeout: cdk.Duration; | |
/** | |
* How long items should be retained on the queue (in seconds). | |
* | |
* You can specify an integer value from 60 seconds (1 minute) to 1209600 seconds (14 days). The | |
* default value is 345600 seconds (4 days). | |
*/ | |
readonly retentionPeriod: cdk.Duration; | |
/** | |
* Send messages to this queue if they were unsuccessfully dequeued a number of times. | |
*/ | |
readonly deadLetterQueue?: sqs.DeadLetterQueue; | |
/** | |
* The billing group to associate with this stack. | |
*/ | |
readonly billingGroup: string; | |
} | |
export class Stack extends cdk.Stack { | |
public readonly queue: sqs.Queue; | |
constructor(scope: Construct, id: string, props: StackProps) { | |
super(scope, id, props); | |
// Set up and configure the SQS queue. | |
const queue = new sqs.Queue(this, 'Queue', { | |
queueName: props.queueName, | |
visibilityTimeout: props.visibilityTimeout, | |
retentionPeriod: props.retentionPeriod, | |
deadLetterQueue: props.deadLetterQueue, | |
}); | |
this.queue = queue; | |
// Tag the queue with the billing group and queue name. | |
cdk.Tags.of(queue).add( | |
'billing', | |
`${props.billingGroup}-sqs-${props.queueName}` | |
); | |
cdk.Tags.of(queue).add('billing-group', `${props.billingGroup}`); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as cdk from 'aws-cdk-lib'; | |
import * as iam from 'aws-cdk-lib/aws-iam'; | |
import * as iot from 'aws-cdk-lib/aws-iot'; | |
import * as sqs from 'aws-cdk-lib/aws-sqs'; | |
import { Construct } from 'constructs'; | |
interface StackProps extends cdk.StackProps { | |
queue: sqs.Queue; | |
ruleName: string; | |
topic: string; | |
description: string; | |
/** | |
* The billing group to associate with this stack. | |
*/ | |
readonly billingGroup: string; | |
} | |
export class Stack extends cdk.Stack { | |
constructor(scope: Construct, id: string, props: StackProps) { | |
super(scope, id, props); | |
// Adjust queue permissions to allow IoT to send messages to it. | |
const queueRole = new iam.Role(this, 'iot-sqs-role', { | |
assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'), | |
}); | |
queueRole.addToPolicy( | |
new iam.PolicyStatement({ | |
resources: [props.queue.queueArn], | |
actions: ['sqs:SendMessage'], | |
}) | |
); | |
// Connect an IoT topic to the queue. | |
const rule = new iot.CfnTopicRule(this, 'TopicRule', { | |
ruleName: props.ruleName, | |
topicRulePayload: { | |
actions: [ | |
{ | |
sqs: { | |
queueUrl: props.queue.queueUrl, | |
roleArn: queueRole.roleArn, | |
useBase64: false, | |
}, | |
}, | |
], | |
description: props.description, | |
sql: `SELECT * FROM "${props.topic}"`, | |
awsIotSqlVersion: '2016-03-23', | |
ruleDisabled: false, | |
}, | |
}); | |
cdk.Tags.of(rule).add( | |
'billing', | |
`${props.billingGroup}-iot-rule-${props.ruleName.toLocaleLowerCase()}` | |
); | |
cdk.Tags.of(rule).add('billing-group', `${props.billingGroup}`); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as cdk from 'aws-cdk-lib'; | |
import * as lambda from 'aws-cdk-lib/aws-lambda'; | |
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources'; | |
import * as sqs from 'aws-cdk-lib/aws-sqs'; | |
import { Construct } from 'constructs'; | |
interface StackProps extends cdk.StackProps { | |
readonly functionName: string; | |
readonly queue: sqs.Queue; | |
readonly assets: string; | |
readonly handler: string; | |
readonly memorySize: number; | |
readonly timeout?: cdk.Duration; | |
/** | |
* The billing group to associate with this stack. | |
*/ | |
readonly billingGroup: string; | |
/** | |
* The maximum number of records in each batch that Lambda pulls from your stream or | |
* queue and sends to your function. | |
* | |
* NOTE: Can at most be 10,000 messages. | |
*/ | |
readonly batchSize: number; | |
/** | |
* The maximum amount of time, in seconds, to spends gathering records before invoking | |
* the function. | |
* | |
* Defaults to 0 seconds (no timeout). | |
* | |
* NOTE: Can at most be 300 seconds (5 minutes). | |
*/ | |
readonly maxBatchingWindow?: cdk.Duration; | |
} | |
export class Stack extends cdk.Stack { | |
constructor(scope: Construct, id: string, props: StackProps) { | |
super(scope, id, props); | |
// Our lambda function and configuration. | |
const lambdaFn = new lambda.Function(this, 'Handler', { | |
functionName: props.functionName, | |
runtime: lambda.Runtime.NODEJS_20_X, | |
handler: props.handler, | |
timeout: props.timeout ?? cdk.Duration.seconds(10), | |
memorySize: props.memorySize, | |
code: lambda.Code.fromAsset(props.assets), | |
// TODO: Lower the log retention (or disable it) to reduce costs. | |
logRetention: cdk.aws_logs.RetentionDays.ONE_WEEK, | |
tracing: lambda.Tracing.DISABLED, | |
}); | |
cdk.Tags.of(lambdaFn).add( | |
'billing', | |
`${props.billingGroup}-lambda-${props.functionName}` | |
); | |
cdk.Tags.of(lambdaFn).add('billing-group', `${props.billingGroup}`); | |
// Define the SQS queue as an event source for the Lambda function. | |
const eventSource = new lambdaEventSources.SqsEventSource(props.queue, { | |
batchSize: props.batchSize, | |
maxBatchingWindow: props.maxBatchingWindow, | |
}); | |
// Tie the queue to the Lambda function. | |
lambdaFn.addEventSource(eventSource); | |
// Allow the Lambda to read from the queue. | |
props.queue.grantConsumeMessages(lambdaFn); | |
// Run 6:00 PM UTC every Monday through Friday | |
// See https://docs.aws.amazon.com/lambda/latest/dg/tutorial-scheduled-events-schedule-expressions.html | |
// const rule = new events.Rule(this, "Rule", { | |
// schedule: events.Schedule.expression("cron(0 18 ? * MON-FRI *)"), | |
// }); | |
// rule.addTarget(new eventsTargets.LambdaFunction(lambdaFn)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as cdk from 'aws-cdk-lib'; | |
import { Construct } from 'constructs'; | |
import * as sqs from '@/services/_lib/sqs'; | |
import * as iot from './iot-rule-to-sqs'; | |
import * as lambda from './lambda'; | |
interface StackProps extends cdk.StackProps {} | |
export class Stack extends cdk.Stack { | |
constructor(scope: Construct, id: string, props: StackProps) { | |
super(scope, id, props); | |
// Setup our SQS queues. | |
const slowQueue = new sqs.Stack(this, 'SlowQueue', { | |
...props, | |
queueName: 'slow-data-ingest', | |
visibilityTimeout: cdk.Duration.seconds(300), | |
retentionPeriod: cdk.Duration.days(14), | |
billingGroup: 'data-ingest', | |
}); | |
const fastQueue = new sqs.Stack(this, 'FastQueue', { | |
...props, | |
queueName: 'fast-data-ingest', | |
visibilityTimeout: cdk.Duration.seconds(60), | |
retentionPeriod: cdk.Duration.days(14), | |
billingGroup: 'data-ingest', | |
}); | |
// Setup our IoT Rule Topics, pointing to our SQS queues. | |
new iot.Stack(this, 'SlowIoTData', { | |
...props, | |
// Example: $aws/rules/SlowIoTData/slow-data/sensor-1/energy | |
ruleName: 'SlowIoTData', | |
queue: slowQueue.queue, | |
topic: 'slow-data/#', | |
description: 'Slow data ingestion topic rule', | |
billingGroup: 'data-ingest', | |
}); | |
new iot.Stack(this, 'FastIoTData', { | |
...props, | |
// Example: $aws/rules/FastIoTData/fast-data/sensor-1/energy | |
ruleName: 'FastIoTData', | |
queue: fastQueue.queue, | |
topic: 'fast-data/#', | |
description: 'Fast data ingestion topic rule', | |
billingGroup: 'data-ingest', | |
}); | |
// Setup our Lambda function to process the batched/slow SQS queue. | |
new lambda.Stack(this, 'SlowDataLambda', { | |
...props, | |
functionName: 'ms-ingest-slow-data', | |
handler: 'slow.handler', | |
assets: 'artifacts/ms-ingest', | |
queue: slowQueue.queue, | |
memorySize: 1024, | |
timeout: cdk.Duration.seconds(30), | |
batchSize: 10000, | |
maxBatchingWindow: cdk.Duration.seconds(300), | |
billingGroup: 'data-ingest', | |
}); | |
// Setup our Lambda function to process the fast SQS queue, i.e. regular queue | |
// behavior. | |
new lambda.Stack(this, 'FastDataLambda', { | |
...props, | |
functionName: 'ms-ingest-fast-data', | |
handler: 'fast.handler', | |
assets: 'artifacts/ms-ingest', | |
queue: fastQueue.queue, | |
memorySize: 512, | |
timeout: cdk.Duration.seconds(5), | |
batchSize: 10, | |
billingGroup: 'data-ingest', | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment