Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jakebrinkmann/9d1da279dfef77b9011365e3303855c0 to your computer and use it in GitHub Desktop.
Save jakebrinkmann/9d1da279dfef77b9011365e3303855c0 to your computer and use it in GitHub Desktop.
Python AWS-CDK Pipes SQS to SFN example
from aws_cdk import (
CfnOutput,
RemovalPolicy,
Stack,
aws_iam as iam,
aws_pipes as pipes,
aws_logs as logs,
aws_sqs as sqs,
aws_stepfunctions as sfn,
)
from constructs import Construct
class EventbridgePipesSqsToStepfunctionsCdkPythonStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
source_queue = sqs.Queue(self, "sqs-queue")
target_sfn = sfn.StateMachine(
self, "state-machine", definition=sfn.Pass(self, "start-state")
)
source_policy = iam.PolicyStatement(
actions=[
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
],
resources=[source_queue.queue_arn],
effect=iam.Effect.ALLOW,
)
target_policy = iam.PolicyStatement(
actions=["states:StartExecution"],
resources=[target_sfn.state_machine_arn],
effect=iam.Effect.ALLOW,
)
pipe_role = iam.Role(
self,
"pipe-role",
assumed_by=iam.ServicePrincipal("pipes.amazonaws.com"),
)
pipe_logs = logs.LogGroup(
self,
"CloudwatchLogs",
log_group_name="/aws/vendedlogs/states/sample-pipes-machine",
retention=logs.RetentionDays.ONE_DAY,
removal_policy=RemovalPolicy.DESTROY,
)
pipe_role.add_to_policy(source_policy)
pipe_role.add_to_policy(target_policy)
__sample_sqs_message = """{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
}"""
pipe = pipes.CfnPipe(
self,
"pipe",
role_arn=pipe_role.role_arn,
source=source_queue.queue_arn,
source_parameters=pipes.CfnPipe.PipeSourceParametersProperty(
sqs_queue_parameters=pipes.CfnPipe.PipeSourceSqsQueueParametersProperty(
batch_size=1
)
),
target=target_sfn.state_machine_arn,
target_parameters=pipes.CfnPipe.PipeTargetParametersProperty(
step_function_state_machine_parameters=pipes.CfnPipe.PipeTargetStateMachineParametersProperty(
invocation_type="FIRE_AND_FORGET"
),
# https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html
# NOTE: by not quoting <$.body> it will come through as un-escaped JSON, instead of as a string
input_template="""
{
"messageId": "<$.messageId>",
"messageAttributes": <$.messageAttributes>,
"body": <$.body>,
"attributes": <$.attributes>
}
""",
),
log_configuration=pipes.CfnPipe.PipeLogConfigurationProperty(
level="TRACE",
include_execution_data=["ALL"],
cloudwatch_logs_log_destination=pipes.CfnPipe.CloudwatchLogsLogDestinationProperty(
log_group_arn=pipe_logs.log_group_arn
),
),
)
# Output
CfnOutput(self, "QUEUE_URL", value=source_queue.queue_url)
CfnOutput(self, "PIPE_ARN", value=pipe.attr_arn)
CfnOutput(self, "STATE_MACHINE_ARN", value=target_sfn.state_machine_arn)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment