Skip to content

Instantly share code, notes, and snippets.

@Tehnix
Created December 16, 2024 08:59
Show Gist options
  • Save Tehnix/baf1372e43588152789f0271f20795d6 to your computer and use it in GitHub Desktop.
Save Tehnix/baf1372e43588152789f0271f20795d6 to your computer and use it in GitHub Desktop.
CDK for IoT Data Ingestion
import * as cdk from 'aws-cdk-lib';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import { Construct } from 'constructs';
interface StackProps extends cdk.StackProps {
/**
* The name of the queue.
*/
readonly queueName: string;
/**
* Timeout of processing a single message.
*
* After dequeuing, the processor has this much time to handle the message and delete it from the queue
* before it becomes visible again for dequeueing by another processor.
*
* Values must be from 0 to 43200 seconds (12 hours). If you don't specify a value, AWS CloudFormation
* uses the default value of 30 seconds.
*/
readonly visibilityTimeout: cdk.Duration;
/**
* How long items should be retained on the queue (in seconds).
*
* You can specify an integer value from 60 seconds (1 minute) to 1209600 seconds (14 days). The
* default value is 345600 seconds (4 days).
*/
readonly retentionPeriod: cdk.Duration;
/**
* Send messages to this queue if they were unsuccessfully dequeued a number of times.
*/
readonly deadLetterQueue?: sqs.DeadLetterQueue;
/**
* The billing group to associate with this stack.
*/
readonly billingGroup: string;
}
export class Stack extends cdk.Stack {
public readonly queue: sqs.Queue;
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// Set up and configure the SQS queue.
const queue = new sqs.Queue(this, 'Queue', {
queueName: props.queueName,
visibilityTimeout: props.visibilityTimeout,
retentionPeriod: props.retentionPeriod,
deadLetterQueue: props.deadLetterQueue,
});
this.queue = queue;
// Tag the queue with the billing group and queue name.
cdk.Tags.of(queue).add(
'billing',
`${props.billingGroup}-sqs-${props.queueName}`
);
cdk.Tags.of(queue).add('billing-group', `${props.billingGroup}`);
}
}
import * as cdk from 'aws-cdk-lib';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as iot from 'aws-cdk-lib/aws-iot';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import { Construct } from 'constructs';
interface StackProps extends cdk.StackProps {
queue: sqs.Queue;
ruleName: string;
topic: string;
description: string;
/**
* The billing group to associate with this stack.
*/
readonly billingGroup: string;
}
export class Stack extends cdk.Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// Adjust queue permissions to allow IoT to send messages to it.
const queueRole = new iam.Role(this, 'iot-sqs-role', {
assumedBy: new iam.ServicePrincipal('iot.amazonaws.com'),
});
queueRole.addToPolicy(
new iam.PolicyStatement({
resources: [props.queue.queueArn],
actions: ['sqs:SendMessage'],
})
);
// Connect an IoT topic to the queue.
const rule = new iot.CfnTopicRule(this, 'TopicRule', {
ruleName: props.ruleName,
topicRulePayload: {
actions: [
{
sqs: {
queueUrl: props.queue.queueUrl,
roleArn: queueRole.roleArn,
useBase64: false,
},
},
],
description: props.description,
sql: `SELECT * FROM "${props.topic}"`,
awsIotSqlVersion: '2016-03-23',
ruleDisabled: false,
},
});
cdk.Tags.of(rule).add(
'billing',
`${props.billingGroup}-iot-rule-${props.ruleName.toLocaleLowerCase()}`
);
cdk.Tags.of(rule).add('billing-group', `${props.billingGroup}`);
}
}
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import { Construct } from 'constructs';
interface StackProps extends cdk.StackProps {
readonly functionName: string;
readonly queue: sqs.Queue;
readonly assets: string;
readonly handler: string;
readonly memorySize: number;
readonly timeout?: cdk.Duration;
/**
* The billing group to associate with this stack.
*/
readonly billingGroup: string;
/**
* The maximum number of records in each batch that Lambda pulls from your stream or
* queue and sends to your function.
*
* NOTE: Can at most be 10,000 messages.
*/
readonly batchSize: number;
/**
* The maximum amount of time, in seconds, to spends gathering records before invoking
* the function.
*
* Defaults to 0 seconds (no timeout).
*
* NOTE: Can at most be 300 seconds (5 minutes).
*/
readonly maxBatchingWindow?: cdk.Duration;
}
export class Stack extends cdk.Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// Our lambda function and configuration.
const lambdaFn = new lambda.Function(this, 'Handler', {
functionName: props.functionName,
runtime: lambda.Runtime.NODEJS_20_X,
handler: props.handler,
timeout: props.timeout ?? cdk.Duration.seconds(10),
memorySize: props.memorySize,
code: lambda.Code.fromAsset(props.assets),
// TODO: Lower the log retention (or disable it) to reduce costs.
logRetention: cdk.aws_logs.RetentionDays.ONE_WEEK,
tracing: lambda.Tracing.DISABLED,
});
cdk.Tags.of(lambdaFn).add(
'billing',
`${props.billingGroup}-lambda-${props.functionName}`
);
cdk.Tags.of(lambdaFn).add('billing-group', `${props.billingGroup}`);
// Define the SQS queue as an event source for the Lambda function.
const eventSource = new lambdaEventSources.SqsEventSource(props.queue, {
batchSize: props.batchSize,
maxBatchingWindow: props.maxBatchingWindow,
});
// Tie the queue to the Lambda function.
lambdaFn.addEventSource(eventSource);
// Allow the Lambda to read from the queue.
props.queue.grantConsumeMessages(lambdaFn);
// Run 6:00 PM UTC every Monday through Friday
// See https://docs.aws.amazon.com/lambda/latest/dg/tutorial-scheduled-events-schedule-expressions.html
// const rule = new events.Rule(this, "Rule", {
// schedule: events.Schedule.expression("cron(0 18 ? * MON-FRI *)"),
// });
// rule.addTarget(new eventsTargets.LambdaFunction(lambdaFn));
}
}
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as sqs from '@/services/_lib/sqs';
import * as iot from './iot-rule-to-sqs';
import * as lambda from './lambda';
interface StackProps extends cdk.StackProps {}
export class Stack extends cdk.Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// Setup our SQS queues.
const slowQueue = new sqs.Stack(this, 'SlowQueue', {
...props,
queueName: 'slow-data-ingest',
visibilityTimeout: cdk.Duration.seconds(300),
retentionPeriod: cdk.Duration.days(14),
billingGroup: 'data-ingest',
});
const fastQueue = new sqs.Stack(this, 'FastQueue', {
...props,
queueName: 'fast-data-ingest',
visibilityTimeout: cdk.Duration.seconds(60),
retentionPeriod: cdk.Duration.days(14),
billingGroup: 'data-ingest',
});
// Setup our IoT Rule Topics, pointing to our SQS queues.
new iot.Stack(this, 'SlowIoTData', {
...props,
// Example: $aws/rules/SlowIoTData/slow-data/sensor-1/energy
ruleName: 'SlowIoTData',
queue: slowQueue.queue,
topic: 'slow-data/#',
description: 'Slow data ingestion topic rule',
billingGroup: 'data-ingest',
});
new iot.Stack(this, 'FastIoTData', {
...props,
// Example: $aws/rules/FastIoTData/fast-data/sensor-1/energy
ruleName: 'FastIoTData',
queue: fastQueue.queue,
topic: 'fast-data/#',
description: 'Fast data ingestion topic rule',
billingGroup: 'data-ingest',
});
// Setup our Lambda function to process the batched/slow SQS queue.
new lambda.Stack(this, 'SlowDataLambda', {
...props,
functionName: 'ms-ingest-slow-data',
handler: 'slow.handler',
assets: 'artifacts/ms-ingest',
queue: slowQueue.queue,
memorySize: 1024,
timeout: cdk.Duration.seconds(30),
batchSize: 10000,
maxBatchingWindow: cdk.Duration.seconds(300),
billingGroup: 'data-ingest',
});
// Setup our Lambda function to process the fast SQS queue, i.e. regular queue
// behavior.
new lambda.Stack(this, 'FastDataLambda', {
...props,
functionName: 'ms-ingest-fast-data',
handler: 'fast.handler',
assets: 'artifacts/ms-ingest',
queue: fastQueue.queue,
memorySize: 512,
timeout: cdk.Duration.seconds(5),
batchSize: 10,
billingGroup: 'data-ingest',
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment