Skip to content

Instantly share code, notes, and snippets.

@ochaton
Created March 26, 2026 23:27
Show Gist options
  • Select an option

  • Save ochaton/ad98f4c9f021f55583491ba11e4a80dc to your computer and use it in GitHub Desktop.

Select an option

Save ochaton/ad98f4c9f021f55583491ba11e4a80dc to your computer and use it in GitHub Desktop.
AWS S3 Remove non-current versions from bucket under prefix
package main
import (
"context"
"flag"
"fmt"
"os"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
"golang.org/x/time/rate"
)
func main() {
var bktName string
var bktPrefix string
var runDelete bool
flag.StringVar(&bktName, "bucket", "", "bucket name")
flag.StringVar(&bktPrefix, "prefix", "", "bucket prefix")
flag.BoolVar(&runDelete, "run-delete", false, "actually delete objects")
flag.Parse()
if bktName == "" {
os.Stderr.WriteString("specify bkt name with -bucket flag\n")
os.Exit(1)
}
if bktPrefix == "" {
os.Stderr.WriteString("specify bkt prefix with -prefix flag\n")
os.Exit(1)
}
cfg, err := config.LoadDefaultConfig(context.Background())
if err != nil {
panic(err)
}
s3Client := s3.NewFromConfig(cfg)
if s3Client == nil {
panic("failed to create S3 client")
}
bkt, err := s3Client.HeadBucket(context.Background(), &s3.HeadBucketInput{
Bucket: aws.String(bktName),
})
if err != nil {
if e, ok := err.(*smithy.OperationError); ok {
fmt.Printf("API error: %s - %s\n", e.OperationName, e.Err.Error())
} else {
fmt.Printf("unexpected error: %v\n", err)
}
return
}
fmt.Printf("bucket %s exists: %v\n", bktName, bkt.BucketRegion)
type message struct {
key string // must be defined
ver string // must be defined
size int64 //
}
// we have a bucket, let's list it
bus := make(chan message, 10000)
wg := &sync.WaitGroup{}
wg.Go(func() {
// we run lister
defer close(bus)
cursor := struct {
key *string
ver *string
}{
key: nil,
ver: nil,
}
rl := rate.NewLimiter(rate.Limit(1000), 1000) // 1000 req/s with burst of 1000
for {
if !rl.Allow() {
rl.WaitN(context.Background(), 1)
}
input := &s3.ListObjectVersionsInput{
Bucket: aws.String(bktName),
Prefix: aws.String(bktPrefix),
}
if cursor.key != nil {
input.KeyMarker = cursor.key
}
if cursor.ver != nil {
input.VersionIdMarker = cursor.ver
}
output, err := s3Client.ListObjectVersions(context.Background(), input)
//FIXME: transient errors, retry
if err != nil {
fmt.Printf("error listing object versions: %v\n", err)
break
}
for _, ver := range output.Versions {
// check that noncurrent
if ver.IsLatest == nil {
// warn, but skip
fmt.Printf("version %s of key %s has nil IsLatest field, skipping\n", aws.ToString(ver.VersionId), aws.ToString(ver.Key))
continue
}
if !*ver.IsLatest {
// not latest, add to bus
// get version id
// backpressure
bus <- message{
key: *ver.Key,
ver: *ver.VersionId,
size: *ver.Size,
}
}
}
if output.IsTruncated != nil && !*output.IsTruncated {
// no more pages
break
}
// set cursor for next page
cursor.key = output.NextKeyMarker
cursor.ver = output.NextVersionIdMarker
fmt.Printf("getting next page after %s\n", *cursor.key)
}
})
type stat struct {
listedCount int
removedCount int
removedSize int64
}
stats := make(chan stat, 1000)
wg.Go(func() {
defer close(stats)
objs := make([]types.ObjectIdentifier, 0, 1000)
deleteBatch := func(sz uint64) {
if !runDelete {
fmt.Println("dry run mode, not actually deleting objects")
stats <- stat{
listedCount: len(objs),
removedCount: 0,
removedSize: 0,
}
return
}
res, err := s3Client.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{
Bucket: aws.String(bktName),
Delete: &types.Delete{
Objects: objs,
Quiet: aws.Bool(true),
},
})
if err != nil {
fmt.Printf("error deleting objects: %v\n", err)
return
}
if len(res.Errors) > 0 {
fmt.Printf("errors deleting objects: %v\n", res.Errors)
return
}
fmt.Printf("rm: %d objects, size: %.2f Gb\n", len(objs), float64(sz)/1024/1024/1024)
stats <- stat{
listedCount: len(objs),
removedCount: len(objs),
removedSize: int64(sz),
}
}
sz := uint64(0)
for msg := range bus {
// fmt.Printf("noncurrent version: key=%s version=%s size=%d\n", msg.key, msg.ver, msg.size)
sz += uint64(msg.size)
if len(objs) < 1000 {
objs = append(objs, types.ObjectIdentifier{
Key: aws.String(msg.key),
VersionId: aws.String(msg.ver),
})
continue
}
deleteBatch(sz)
sz = 0
// clear batch
objs = objs[:0]
}
if len(objs) > 0 {
deleteBatch(sz)
}
})
wg.Go(func() {
// Collect statistics with DeleteObjects per second:
// And print them every 10 seconds
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
totalRemoved := 0
totalRemovedSize := uint64(0)
prevReported := 0
prevReportedSize := uint64(0)
prevReportedTime := time.Now()
printStat := func() {
elapsed := time.Since(prevReportedTime).Seconds()
fmt.Printf("total removed: %d/%d (rps: %.2f/s, size: %d bytes, bps : %.2f Gb/s)\n",
totalRemoved, totalRemoved-prevReported,
float64(totalRemoved-prevReported)/elapsed,
totalRemovedSize,
float64(totalRemovedSize-prevReportedSize)/elapsed/1024/1024/1024,
)
prevReported = totalRemoved
prevReportedSize = totalRemovedSize
prevReportedTime = time.Now()
}
for {
select {
case v, ok := <-stats:
if !ok {
// Print final stats
printStat()
return
}
totalRemoved += v.removedCount
totalRemovedSize += uint64(v.removedSize)
case <-ticker.C:
printStat()
}
}
})
wg.Wait()
fmt.Println("all done")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment