Skip to content

Instantly share code, notes, and snippets.

@t04glovern
Created June 15, 2024 03:43
Show Gist options
  • Save t04glovern/b41057b1577d495027db83e3d6837bee to your computer and use it in GitHub Desktop.
Save t04glovern/b41057b1577d495027db83e3d6837bee to your computer and use it in GitHub Desktop.
A CloudFormation template to create IAM role, policy, Step Functions, and CloudWatch event rule and target for table ingestion maintenance vacuum with concurrency control
AWSTemplateFormatVersion: '2010-09-09'
Description: A CloudFormation template to create IAM role, policy, Step Functions, and CloudWatch event rule and target for table ingestion maintenance vacuum with concurrency control
Parameters:
WorkgroupName:
Type: String
Default: primary
Description: The name of the Athena Workgroup
DatabaseName:
Type: String
Default: default
Description: The name of the database
TableName:
Type: String
Description: The name of the Iceberg table
BucketName:
Type: String
Description: The name of the S3 bucket used for your Iceberg table
BucketPrefix:
Type: String
Description: The prefix for the S3 bucket used for your Iceberg table
AthenaOutputBucketName:
Type: String
Description: The name of the S3 bucket used for Athena query outputs
AthenaOutputBucketPrefix:
Type: String
Default: ""
Description: The prefix for the S3 bucket used for Athena query outputs
VacuumFrequency:
Type: String
Default: rate(1 hour)
Description: The frequency for the Iceberg vacuum
Conditions:
IsBucketPrefixSpecified: !Not [!Equals [!Ref BucketPrefix, ""]]
IsAthenaOutputBucketPrefixSpecified: !Not [!Equals [!Ref AthenaOutputBucketPrefix, ""]]
Resources:
StepFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: 'states.amazonaws.com'
Action: 'sts:AssumeRole'
Condition:
ArnLike:
aws:SourceArn: !Sub 'arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:*'
StringEquals:
aws:SourceAccount: !Ref AWS::AccountId
- Effect: Allow
Principal:
Service: 'events.amazonaws.com'
Action: 'sts:AssumeRole'
Condition:
ArnLike:
aws:SourceArn: !Sub 'arn:aws:events:${AWS::Region}:${AWS::AccountId}:rule/MaintenanceVacuumEventRule'
StepFunctionRolePolicy:
Type: AWS::IAM::Policy
Properties:
Roles: [!Ref StepFunctionRole]
PolicyName: StepFunctionRolePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- xray:PutTraceSegments
- xray:PutTelemetryRecords
- xray:GetSamplingRules
- xray:GetSamplingTargets
Resource: '*'
- Effect: Allow
Action: states:StartExecution
Resource:
- !Ref MaintenanceVacuumStateMachine
- Effect: Allow
Action:
- athena:startQueryExecution
- athena:stopQueryExecution
- athena:getQueryExecution
- athena:getDataCatalog
Resource:
- !Sub 'arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${WorkgroupName}'
- !Sub 'arn:aws:athena:${AWS::Region}:${AWS::AccountId}:datacatalog/*'
- !If
- IsBucketPrefixSpecified
- Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
- s3:DeleteObject
- s3:ListBucket
Resource:
- !Sub 'arn:aws:s3:::${BucketName}'
- !Sub 'arn:aws:s3:::${BucketName}/${BucketPrefix}/*'
- Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
- s3:DeleteObject
- s3:ListBucket
Resource:
- !Sub 'arn:aws:s3:::${BucketName}'
- !Sub 'arn:aws:s3:::${BucketName}/*'
- !If
- IsAthenaOutputBucketPrefixSpecified
- Effect: Allow
Action:
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:ListMultipartUploadParts
- s3:AbortMultipartUpload
- s3:CreateBucket
- s3:PutObject
Resource:
- !Sub 'arn:aws:s3:::${AthenaOutputBucketName}'
- !Sub 'arn:aws:s3:::${AthenaOutputBucketName}/${AthenaOutputBucketPrefix}/*'
- Effect: Allow
Action:
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:ListMultipartUploadParts
- s3:AbortMultipartUpload
- s3:CreateBucket
- s3:PutObject
Resource:
- !Sub 'arn:aws:s3:::${AthenaOutputBucketName}'
- !Sub 'arn:aws:s3:::${AthenaOutputBucketName}/*'
- Effect: Allow
Action:
- glue:GetDatabase*
- glue:UpdateTable
- glue:GetTable
- glue:GetTables
- glue:BatchDeleteTable
- glue:GetPartition*
- glue:BatchGetPartition
Resource:
- !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog'
- !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DatabaseName}'
- !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DatabaseName}/${TableName}'
- Effect: Allow
Action:
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:GetItem
Resource:
- !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/MaintenanceVacuumLockTable'
LockTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: MaintenanceVacuumLockTable
AttributeDefinitions:
- AttributeName: LockName
AttributeType: S
KeySchema:
- AttributeName: LockName
KeyType: HASH
BillingMode: PAY_PER_REQUEST
MaintenanceVacuumStateMachine:
Type: AWS::StepFunctions::StateMachine
Properties:
RoleArn: !GetAtt StepFunctionRole.Arn
TracingConfiguration:
Enabled: true
DefinitionString: !Sub |
{
"StartAt": "Initialize Lock",
"States": {
"Initialize Lock": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "MaintenanceVacuumLockTable",
"Item": {
"LockName": {
"S": "Semaphore"
},
"currentlockcount": {
"N": "0"
}
},
"ConditionExpression": "attribute_not_exists(LockName)"
},
"Catch": [
{
"ErrorEquals": ["DynamoDB.ConditionalCheckFailedException"],
"Next": "Acquire Lock"
}
],
"Next": "Acquire Lock"
},
"Acquire Lock": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "MaintenanceVacuumLockTable",
"Key": {
"LockName": {
"S": "Semaphore"
}
},
"ExpressionAttributeNames": {
"#currentlockcount": "currentlockcount",
"#lockownerid.$": "$$.Execution.Id"
},
"ExpressionAttributeValues": {
":increase": {
"N": "1"
},
":limit": {
"N": "1"
},
":lockacquiredtime": {
"S.$": "$$.State.EnteredTime"
}
},
"UpdateExpression": "SET #currentlockcount = #currentlockcount + :increase, #lockownerid = :lockacquiredtime",
"ConditionExpression": "attribute_not_exists(#lockownerid) AND #currentlockcount < :limit",
"ReturnValues": "UPDATED_NEW"
},
"Retry": [
{
"ErrorEquals": ["DynamoDB.AmazonDynamoDBException"],
"IntervalSeconds": 2,
"MaxAttempts": 0,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": ["DynamoDB.ConditionalCheckFailedException"],
"Next": "LockConflictError"
}
],
"Next": "StartState"
},
"LockConflictError": {
"Type": "Fail",
"Cause": "Lock acquisition failed",
"Error": "LockConflict"
},
"StartState": {
"Type": "Task",
"Resource": "arn:aws:states:::athena:startQueryExecution.sync",
"Parameters": {
"QueryString": "VACUUM ${DatabaseName}.${TableName}",
"WorkGroup": "${WorkgroupName}",
"ResultConfiguration": {
"OutputLocation": "s3://${AthenaOutputBucketName}/${AthenaOutputBucketPrefix}"
}
},
"Catch": [
{
"ErrorEquals": ["States.TaskFailed"],
"Next": "Parse Error JSON"
}
],
"Next": "Release Lock"
},
"Parse Error JSON": {
"Type": "Pass",
"Parameters": {
"Cause.$": "States.StringToJson($.Cause)"
},
"Next": "Check If Error Needs More Vacuums"
},
"Check If Error Needs More Vacuums": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Cause.QueryExecution.Status.StateChangeReason",
"StringMatches": "ICEBERG_VACUUM_MORE_RUNS_NEEDED*",
"Next": "StartState"
}
],
"Default": "FailReleaseLock"
},
"Release Lock": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "MaintenanceVacuumLockTable",
"Key": {
"LockName": {
"S": "Semaphore"
}
},
"ExpressionAttributeNames": {
"#currentlockcount": "currentlockcount",
"#lockownerid.$": "$$.Execution.Id"
},
"ExpressionAttributeValues": {
":decrease": {
"N": "1"
}
},
"UpdateExpression": "SET #currentlockcount = #currentlockcount - :decrease REMOVE #lockownerid",
"ConditionExpression": "attribute_exists(#lockownerid)",
"ReturnValues": "UPDATED_NEW"
},
"End": true
},
"FailReleaseLock": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "MaintenanceVacuumLockTable",
"Key": {
"LockName": {
"S": "Semaphore"
}
},
"ExpressionAttributeNames": {
"#currentlockcount": "currentlockcount",
"#lockownerid.$": "$$.Execution.Id"
},
"ExpressionAttributeValues": {
":decrease": {
"N": "1"
}
},
"UpdateExpression": "SET #currentlockcount = #currentlockcount - :decrease REMOVE #lockownerid",
"ConditionExpression": "attribute_exists(#lockownerid)",
"ReturnValues": "UPDATED_NEW"
},
"Next": "Fail"
},
"Fail": {
"Type": "Fail",
"Cause": "Query failed."
}
}
}
MaintenanceVacuumEventRule:
Type: AWS::Events::Rule
Properties:
Name: MaintenanceVacuumEventRule
ScheduleExpression: !Ref VacuumFrequency
State: 'ENABLED'
Targets:
- Arn: !Ref MaintenanceVacuumStateMachine
RoleArn: !GetAtt StepFunctionRole.Arn
Id: maintenance-vacuum
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment