Skip to content

Instantly share code, notes, and snippets.

@yokawasa
Last active May 30, 2022 14:57
Show Gist options
  • Select an option

  • Save yokawasa/ca53820a3ef624636ad2726270db675c to your computer and use it in GitHub Desktop.

Select an option

Save yokawasa/ca53820a3ef624636ad2726270db675c to your computer and use it in GitHub Desktop.
Go SDK sample for DynamoDB Stream
package main
// ref
// https://github.com/urakozz/go-dynamodb-stream-subscriber
// https://github.com/aws/aws-sdk-go-v2
/*
aws cli for dynamodb stream info
# Get the full ARN for DynamoDB Streams
aws dynamodb describe-table --table-name 'foo-test01' --query 'Table.LatestStreamArn' --output text
# describe dynamodb Stream
# https://docs.aws.amazon.com/cli/latest/reference/dynamodbstreams/describe-stream.html
aws dynamodbstreams describe-stream --stream-arn <stream arn>
*/
import (
"errors"
"fmt"
"os"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
)
func getLatestStreamArn(db dynamodbiface.DynamoDBAPI, table *string) (*string, error) {
tableInfo, err := db.DescribeTable(&dynamodb.DescribeTableInput{TableName: table})
if err != nil {
return nil, err
}
if nil == tableInfo.Table.LatestStreamArn {
return nil, errors.New("empty table stream arn")
}
return tableInfo.Table.LatestStreamArn, nil
}
func main() {
if len(os.Args) != 2 {
fmt.Printf("[ERROR] invalid options: %v\n", os.Args[1:])
os.Exit(1)
}
table := os.Args[1]
cfg := aws.NewConfig().WithRegion("ap-northeast-1")
sess := session.New()
stream := dynamodbstreams.New(sess, cfg)
db := dynamodb.New(sess, cfg)
streamArn, err := getLatestStreamArn(db, &table)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
// Reference
// Java sample https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-list-shards.html
// https://docs.amazonaws.cn/en_us/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html
var limit int64 = 100 // the upper limit
var shardNum int64 = 0
var lastShardId *string = nil
input := dynamodbstreams.DescribeStreamInput{
StreamArn: streamArn,
Limit: &limit,
}
for {
if lastShardId != nil {
input = dynamodbstreams.DescribeStreamInput{
StreamArn: streamArn,
Limit: &limit,
ExclusiveStartShardId: lastShardId,
}
}
des, err := stream.DescribeStream(&input)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
shardNum += int64(len(des.StreamDescription.Shards))
lastShardId = des.StreamDescription.LastEvaluatedShardId
// If LastEvaluatedShardId is empty, then the "last page" of results has been
// processed and there is currently no more data to be retrieved.
if lastShardId == nil {
break
}
}
fmt.Printf("DynamoDB Stream Num: %d\n", shardNum)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment