Created
June 12, 2024 14:39
-
-
Save jakebrinkmann/9d1da279dfef77b9011365e3303855c0 to your computer and use it in GitHub Desktop.
Python AWS-CDK Pipes SQS to SFN example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aws_cdk import ( | |
CfnOutput, | |
RemovalPolicy, | |
Stack, | |
aws_iam as iam, | |
aws_pipes as pipes, | |
aws_logs as logs, | |
aws_sqs as sqs, | |
aws_stepfunctions as sfn, | |
) | |
from constructs import Construct | |
class EventbridgePipesSqsToStepfunctionsCdkPythonStack(Stack): | |
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: | |
super().__init__(scope, construct_id, **kwargs) | |
source_queue = sqs.Queue(self, "sqs-queue") | |
target_sfn = sfn.StateMachine( | |
self, "state-machine", definition=sfn.Pass(self, "start-state") | |
) | |
source_policy = iam.PolicyStatement( | |
actions=[ | |
"sqs:ReceiveMessage", | |
"sqs:DeleteMessage", | |
"sqs:GetQueueAttributes", | |
], | |
resources=[source_queue.queue_arn], | |
effect=iam.Effect.ALLOW, | |
) | |
target_policy = iam.PolicyStatement( | |
actions=["states:StartExecution"], | |
resources=[target_sfn.state_machine_arn], | |
effect=iam.Effect.ALLOW, | |
) | |
pipe_role = iam.Role( | |
self, | |
"pipe-role", | |
assumed_by=iam.ServicePrincipal("pipes.amazonaws.com"), | |
) | |
pipe_logs = logs.LogGroup( | |
self, | |
"CloudwatchLogs", | |
log_group_name="/aws/vendedlogs/states/sample-pipes-machine", | |
retention=logs.RetentionDays.ONE_DAY, | |
removal_policy=RemovalPolicy.DESTROY, | |
) | |
pipe_role.add_to_policy(source_policy) | |
pipe_role.add_to_policy(target_policy) | |
__sample_sqs_message = """{ | |
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", | |
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", | |
"body": "Test message.", | |
"attributes": { | |
"ApproximateReceiveCount": "1", | |
"SentTimestamp": "1545082649183", | |
"SenderId": "AIDAIENQZJOLO23YVJ4VO", | |
"ApproximateFirstReceiveTimestamp": "1545082649185" | |
}, | |
"messageAttributes": {}, | |
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", | |
"eventSource": "aws:sqs", | |
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", | |
"awsRegion": "us-east-2" | |
}""" | |
pipe = pipes.CfnPipe( | |
self, | |
"pipe", | |
role_arn=pipe_role.role_arn, | |
source=source_queue.queue_arn, | |
source_parameters=pipes.CfnPipe.PipeSourceParametersProperty( | |
sqs_queue_parameters=pipes.CfnPipe.PipeSourceSqsQueueParametersProperty( | |
batch_size=1 | |
) | |
), | |
target=target_sfn.state_machine_arn, | |
target_parameters=pipes.CfnPipe.PipeTargetParametersProperty( | |
step_function_state_machine_parameters=pipes.CfnPipe.PipeTargetStateMachineParametersProperty( | |
invocation_type="FIRE_AND_FORGET" | |
), | |
# https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html | |
# NOTE: by not quoting <$.body> it will come through as un-escaped JSON, instead of as a string | |
input_template=""" | |
{ | |
"messageId": "<$.messageId>", | |
"messageAttributes": <$.messageAttributes>, | |
"body": <$.body>, | |
"attributes": <$.attributes> | |
} | |
""", | |
), | |
log_configuration=pipes.CfnPipe.PipeLogConfigurationProperty( | |
level="TRACE", | |
include_execution_data=["ALL"], | |
cloudwatch_logs_log_destination=pipes.CfnPipe.CloudwatchLogsLogDestinationProperty( | |
log_group_arn=pipe_logs.log_group_arn | |
), | |
), | |
) | |
# Output | |
CfnOutput(self, "QUEUE_URL", value=source_queue.queue_url) | |
CfnOutput(self, "PIPE_ARN", value=pipe.attr_arn) | |
CfnOutput(self, "STATE_MACHINE_ARN", value=target_sfn.state_machine_arn) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment