Created
June 15, 2024 03:43
-
-
Save t04glovern/b41057b1577d495027db83e3d6837bee to your computer and use it in GitHub Desktop.
A CloudFormation template to create IAM role, policy, Step Functions, and CloudWatch event rule and target for table ingestion maintenance vacuum with concurrency control
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
AWSTemplateFormatVersion: '2010-09-09' | |
Description: A CloudFormation template to create IAM role, policy, Step Functions, and CloudWatch event rule and target for table ingestion maintenance vacuum with concurrency control | |
Parameters: | |
WorkgroupName: | |
Type: String | |
Default: primary | |
Description: The name of the Athena Workgroup | |
DatabaseName: | |
Type: String | |
Default: default | |
Description: The name of the database | |
TableName: | |
Type: String | |
Description: The name of the Iceberg table | |
BucketName: | |
Type: String | |
Description: The name of the S3 bucket used for your Iceberg table | |
BucketPrefix: | |
Type: String | |
Description: The prefix for the S3 bucket used for your Iceberg table | |
AthenaOutputBucketName: | |
Type: String | |
Description: The name of the S3 bucket used for Athena query outputs | |
AthenaOutputBucketPrefix: | |
Type: String | |
Default: "" | |
Description: The prefix for the S3 bucket used for Athena query outputs | |
VacuumFrequency: | |
Type: String | |
Default: rate(1 hour) | |
Description: The frequency for the Iceberg vacuum | |
Conditions: | |
IsBucketPrefixSpecified: !Not [!Equals [!Ref BucketPrefix, ""]] | |
IsAthenaOutputBucketPrefixSpecified: !Not [!Equals [!Ref AthenaOutputBucketPrefix, ""]] | |
Resources: | |
StepFunctionRole: | |
Type: AWS::IAM::Role | |
Properties: | |
AssumeRolePolicyDocument: | |
Version: '2012-10-17' | |
Statement: | |
- Effect: Allow | |
Principal: | |
Service: 'states.amazonaws.com' | |
Action: 'sts:AssumeRole' | |
Condition: | |
ArnLike: | |
aws:SourceArn: !Sub 'arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:*' | |
StringEquals: | |
aws:SourceAccount: !Ref AWS::AccountId | |
- Effect: Allow | |
Principal: | |
Service: 'events.amazonaws.com' | |
Action: 'sts:AssumeRole' | |
Condition: | |
ArnLike: | |
aws:SourceArn: !Sub 'arn:aws:events:${AWS::Region}:${AWS::AccountId}:rule/MaintenanceVacuumEventRule' | |
StepFunctionRolePolicy: | |
Type: AWS::IAM::Policy | |
Properties: | |
Roles: [!Ref StepFunctionRole] | |
PolicyName: StepFunctionRolePolicy | |
PolicyDocument: | |
Version: '2012-10-17' | |
Statement: | |
- Effect: Allow | |
Action: | |
- xray:PutTraceSegments | |
- xray:PutTelemetryRecords | |
- xray:GetSamplingRules | |
- xray:GetSamplingTargets | |
Resource: '*' | |
- Effect: Allow | |
Action: states:StartExecution | |
Resource: | |
- !Ref MaintenanceVacuumStateMachine | |
- Effect: Allow | |
Action: | |
- athena:startQueryExecution | |
- athena:stopQueryExecution | |
- athena:getQueryExecution | |
- athena:getDataCatalog | |
Resource: | |
- !Sub 'arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${WorkgroupName}' | |
- !Sub 'arn:aws:athena:${AWS::Region}:${AWS::AccountId}:datacatalog/*' | |
- !If | |
- IsBucketPrefixSpecified | |
- Effect: Allow | |
Action: | |
- s3:GetObject | |
- s3:PutObject | |
- s3:DeleteObject | |
- s3:ListBucket | |
Resource: | |
- !Sub 'arn:aws:s3:::${BucketName}' | |
- !Sub 'arn:aws:s3:::${BucketName}/${BucketPrefix}/*' | |
- Effect: Allow | |
Action: | |
- s3:GetObject | |
- s3:PutObject | |
- s3:DeleteObject | |
- s3:ListBucket | |
Resource: | |
- !Sub 'arn:aws:s3:::${BucketName}' | |
- !Sub 'arn:aws:s3:::${BucketName}/*' | |
- !If | |
- IsAthenaOutputBucketPrefixSpecified | |
- Effect: Allow | |
Action: | |
- s3:GetBucketLocation | |
- s3:GetObject | |
- s3:ListBucket | |
- s3:ListBucketMultipartUploads | |
- s3:ListMultipartUploadParts | |
- s3:AbortMultipartUpload | |
- s3:CreateBucket | |
- s3:PutObject | |
Resource: | |
- !Sub 'arn:aws:s3:::${AthenaOutputBucketName}' | |
- !Sub 'arn:aws:s3:::${AthenaOutputBucketName}/${AthenaOutputBucketPrefix}/*' | |
- Effect: Allow | |
Action: | |
- s3:GetBucketLocation | |
- s3:GetObject | |
- s3:ListBucket | |
- s3:ListBucketMultipartUploads | |
- s3:ListMultipartUploadParts | |
- s3:AbortMultipartUpload | |
- s3:CreateBucket | |
- s3:PutObject | |
Resource: | |
- !Sub 'arn:aws:s3:::${AthenaOutputBucketName}' | |
- !Sub 'arn:aws:s3:::${AthenaOutputBucketName}/*' | |
- Effect: Allow | |
Action: | |
- glue:GetDatabase* | |
- glue:UpdateTable | |
- glue:GetTable | |
- glue:GetTables | |
- glue:BatchDeleteTable | |
- glue:GetPartition* | |
- glue:BatchGetPartition | |
Resource: | |
- !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog' | |
- !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DatabaseName}' | |
- !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DatabaseName}/${TableName}' | |
- Effect: Allow | |
Action: | |
- dynamodb:PutItem | |
- dynamodb:UpdateItem | |
- dynamodb:GetItem | |
Resource: | |
- !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/MaintenanceVacuumLockTable' | |
LockTable: | |
Type: AWS::DynamoDB::Table | |
Properties: | |
TableName: MaintenanceVacuumLockTable | |
AttributeDefinitions: | |
- AttributeName: LockName | |
AttributeType: S | |
KeySchema: | |
- AttributeName: LockName | |
KeyType: HASH | |
BillingMode: PAY_PER_REQUEST | |
MaintenanceVacuumStateMachine: | |
Type: AWS::StepFunctions::StateMachine | |
Properties: | |
RoleArn: !GetAtt StepFunctionRole.Arn | |
TracingConfiguration: | |
Enabled: true | |
DefinitionString: !Sub | | |
{ | |
"StartAt": "Initialize Lock", | |
"States": { | |
"Initialize Lock": { | |
"Type": "Task", | |
"Resource": "arn:aws:states:::dynamodb:putItem", | |
"Parameters": { | |
"TableName": "MaintenanceVacuumLockTable", | |
"Item": { | |
"LockName": { | |
"S": "Semaphore" | |
}, | |
"currentlockcount": { | |
"N": "0" | |
} | |
}, | |
"ConditionExpression": "attribute_not_exists(LockName)" | |
}, | |
"Catch": [ | |
{ | |
"ErrorEquals": ["DynamoDB.ConditionalCheckFailedException"], | |
"Next": "Acquire Lock" | |
} | |
], | |
"Next": "Acquire Lock" | |
}, | |
"Acquire Lock": { | |
"Type": "Task", | |
"Resource": "arn:aws:states:::dynamodb:updateItem", | |
"Parameters": { | |
"TableName": "MaintenanceVacuumLockTable", | |
"Key": { | |
"LockName": { | |
"S": "Semaphore" | |
} | |
}, | |
"ExpressionAttributeNames": { | |
"#currentlockcount": "currentlockcount", | |
"#lockownerid.$": "$$.Execution.Id" | |
}, | |
"ExpressionAttributeValues": { | |
":increase": { | |
"N": "1" | |
}, | |
":limit": { | |
"N": "1" | |
}, | |
":lockacquiredtime": { | |
"S.$": "$$.State.EnteredTime" | |
} | |
}, | |
"UpdateExpression": "SET #currentlockcount = #currentlockcount + :increase, #lockownerid = :lockacquiredtime", | |
"ConditionExpression": "attribute_not_exists(#lockownerid) AND #currentlockcount < :limit", | |
"ReturnValues": "UPDATED_NEW" | |
}, | |
"Retry": [ | |
{ | |
"ErrorEquals": ["DynamoDB.AmazonDynamoDBException"], | |
"IntervalSeconds": 2, | |
"MaxAttempts": 0, | |
"BackoffRate": 2 | |
} | |
], | |
"Catch": [ | |
{ | |
"ErrorEquals": ["DynamoDB.ConditionalCheckFailedException"], | |
"Next": "LockConflictError" | |
} | |
], | |
"Next": "StartState" | |
}, | |
"LockConflictError": { | |
"Type": "Fail", | |
"Cause": "Lock acquisition failed", | |
"Error": "LockConflict" | |
}, | |
"StartState": { | |
"Type": "Task", | |
"Resource": "arn:aws:states:::athena:startQueryExecution.sync", | |
"Parameters": { | |
"QueryString": "VACUUM ${DatabaseName}.${TableName}", | |
"WorkGroup": "${WorkgroupName}", | |
"ResultConfiguration": { | |
"OutputLocation": "s3://${AthenaOutputBucketName}/${AthenaOutputBucketPrefix}" | |
} | |
}, | |
"Catch": [ | |
{ | |
"ErrorEquals": ["States.TaskFailed"], | |
"Next": "Parse Error JSON" | |
} | |
], | |
"Next": "Release Lock" | |
}, | |
"Parse Error JSON": { | |
"Type": "Pass", | |
"Parameters": { | |
"Cause.$": "States.StringToJson($.Cause)" | |
}, | |
"Next": "Check If Error Needs More Vacuums" | |
}, | |
"Check If Error Needs More Vacuums": { | |
"Type": "Choice", | |
"Choices": [ | |
{ | |
"Variable": "$.Cause.QueryExecution.Status.StateChangeReason", | |
"StringMatches": "ICEBERG_VACUUM_MORE_RUNS_NEEDED*", | |
"Next": "StartState" | |
} | |
], | |
"Default": "FailReleaseLock" | |
}, | |
"Release Lock": { | |
"Type": "Task", | |
"Resource": "arn:aws:states:::dynamodb:updateItem", | |
"Parameters": { | |
"TableName": "MaintenanceVacuumLockTable", | |
"Key": { | |
"LockName": { | |
"S": "Semaphore" | |
} | |
}, | |
"ExpressionAttributeNames": { | |
"#currentlockcount": "currentlockcount", | |
"#lockownerid.$": "$$.Execution.Id" | |
}, | |
"ExpressionAttributeValues": { | |
":decrease": { | |
"N": "1" | |
} | |
}, | |
"UpdateExpression": "SET #currentlockcount = #currentlockcount - :decrease REMOVE #lockownerid", | |
"ConditionExpression": "attribute_exists(#lockownerid)", | |
"ReturnValues": "UPDATED_NEW" | |
}, | |
"End": true | |
}, | |
"FailReleaseLock": { | |
"Type": "Task", | |
"Resource": "arn:aws:states:::dynamodb:updateItem", | |
"Parameters": { | |
"TableName": "MaintenanceVacuumLockTable", | |
"Key": { | |
"LockName": { | |
"S": "Semaphore" | |
} | |
}, | |
"ExpressionAttributeNames": { | |
"#currentlockcount": "currentlockcount", | |
"#lockownerid.$": "$$.Execution.Id" | |
}, | |
"ExpressionAttributeValues": { | |
":decrease": { | |
"N": "1" | |
} | |
}, | |
"UpdateExpression": "SET #currentlockcount = #currentlockcount - :decrease REMOVE #lockownerid", | |
"ConditionExpression": "attribute_exists(#lockownerid)", | |
"ReturnValues": "UPDATED_NEW" | |
}, | |
"Next": "Fail" | |
}, | |
"Fail": { | |
"Type": "Fail", | |
"Cause": "Query failed." | |
} | |
} | |
} | |
MaintenanceVacuumEventRule: | |
Type: AWS::Events::Rule | |
Properties: | |
Name: MaintenanceVacuumEventRule | |
ScheduleExpression: !Ref VacuumFrequency | |
State: 'ENABLED' | |
Targets: | |
- Arn: !Ref MaintenanceVacuumStateMachine | |
RoleArn: !GetAtt StepFunctionRole.Arn | |
Id: maintenance-vacuum |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment