Last active
May 30, 2022 14:57
-
-
Save yokawasa/ca53820a3ef624636ad2726270db675c to your computer and use it in GitHub Desktop.
Go SDK sample for DynamoDB Stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| // ref | |
| // https://github.com/urakozz/go-dynamodb-stream-subscriber | |
| // https://github.com/aws/aws-sdk-go-v2 | |
| /* | |
| aws cli for dynamodb stream info | |
| # Get the full ARN for DynamoDB Streams | |
| aws dynamodb describe-table --table-name 'foo-test01' --query 'Table.LatestStreamArn' --output text | |
| # describe dynamodb Stream | |
| # https://docs.aws.amazon.com/cli/latest/reference/dynamodbstreams/describe-stream.html | |
| aws dynamodbstreams describe-stream --stream-arn <stream arn> | |
| */ | |
| import ( | |
| "errors" | |
| "fmt" | |
| "os" | |
| "github.com/aws/aws-sdk-go/aws" | |
| "github.com/aws/aws-sdk-go/aws/session" | |
| "github.com/aws/aws-sdk-go/service/dynamodb" | |
| "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" | |
| "github.com/aws/aws-sdk-go/service/dynamodbstreams" | |
| ) | |
| func getLatestStreamArn(db dynamodbiface.DynamoDBAPI, table *string) (*string, error) { | |
| tableInfo, err := db.DescribeTable(&dynamodb.DescribeTableInput{TableName: table}) | |
| if err != nil { | |
| return nil, err | |
| } | |
| if nil == tableInfo.Table.LatestStreamArn { | |
| return nil, errors.New("empty table stream arn") | |
| } | |
| return tableInfo.Table.LatestStreamArn, nil | |
| } | |
| func main() { | |
| if len(os.Args) != 2 { | |
| fmt.Printf("[ERROR] invalid options: %v\n", os.Args[1:]) | |
| os.Exit(1) | |
| } | |
| table := os.Args[1] | |
| cfg := aws.NewConfig().WithRegion("ap-northeast-1") | |
| sess := session.New() | |
| stream := dynamodbstreams.New(sess, cfg) | |
| db := dynamodb.New(sess, cfg) | |
| streamArn, err := getLatestStreamArn(db, &table) | |
| if err != nil { | |
| fmt.Printf("error: %v\n", err) | |
| os.Exit(1) | |
| } | |
| // Reference | |
| // Java sample https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-list-shards.html | |
| // https://docs.amazonaws.cn/en_us/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html | |
| var limit int64 = 100 // the upper limit | |
| var shardNum int64 = 0 | |
| var lastShardId *string = nil | |
| input := dynamodbstreams.DescribeStreamInput{ | |
| StreamArn: streamArn, | |
| Limit: &limit, | |
| } | |
| for { | |
| if lastShardId != nil { | |
| input = dynamodbstreams.DescribeStreamInput{ | |
| StreamArn: streamArn, | |
| Limit: &limit, | |
| ExclusiveStartShardId: lastShardId, | |
| } | |
| } | |
| des, err := stream.DescribeStream(&input) | |
| if err != nil { | |
| fmt.Printf("error: %v\n", err) | |
| os.Exit(1) | |
| } | |
| shardNum += int64(len(des.StreamDescription.Shards)) | |
| lastShardId = des.StreamDescription.LastEvaluatedShardId | |
| // If LastEvaluatedShardId is empty, then the "last page" of results has been | |
| // processed and there is currently no more data to be retrieved. | |
| if lastShardId == nil { | |
| break | |
| } | |
| } | |
| fmt.Printf("DynamoDB Stream Num: %d\n", shardNum) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment