Created
March 26, 2026 23:27
-
-
Save ochaton/ad98f4c9f021f55583491ba11e4a80dc to your computer and use it in GitHub Desktop.
AWS S3 Remove non-current versions from bucket under prefix
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "context" | |
| "flag" | |
| "fmt" | |
| "os" | |
| "sync" | |
| "time" | |
| "github.com/aws/aws-sdk-go-v2/aws" | |
| "github.com/aws/aws-sdk-go-v2/config" | |
| "github.com/aws/aws-sdk-go-v2/service/s3" | |
| "github.com/aws/aws-sdk-go-v2/service/s3/types" | |
| "github.com/aws/smithy-go" | |
| "golang.org/x/time/rate" | |
| ) | |
| func main() { | |
| var bktName string | |
| var bktPrefix string | |
| var runDelete bool | |
| flag.StringVar(&bktName, "bucket", "", "bucket name") | |
| flag.StringVar(&bktPrefix, "prefix", "", "bucket prefix") | |
| flag.BoolVar(&runDelete, "run-delete", false, "actually delete objects") | |
| flag.Parse() | |
| if bktName == "" { | |
| os.Stderr.WriteString("specify bkt name with -bucket flag\n") | |
| os.Exit(1) | |
| } | |
| if bktPrefix == "" { | |
| os.Stderr.WriteString("specify bkt prefix with -prefix flag\n") | |
| os.Exit(1) | |
| } | |
| cfg, err := config.LoadDefaultConfig(context.Background()) | |
| if err != nil { | |
| panic(err) | |
| } | |
| s3Client := s3.NewFromConfig(cfg) | |
| if s3Client == nil { | |
| panic("failed to create S3 client") | |
| } | |
| bkt, err := s3Client.HeadBucket(context.Background(), &s3.HeadBucketInput{ | |
| Bucket: aws.String(bktName), | |
| }) | |
| if err != nil { | |
| if e, ok := err.(*smithy.OperationError); ok { | |
| fmt.Printf("API error: %s - %s\n", e.OperationName, e.Err.Error()) | |
| } else { | |
| fmt.Printf("unexpected error: %v\n", err) | |
| } | |
| return | |
| } | |
| fmt.Printf("bucket %s exists: %v\n", bktName, bkt.BucketRegion) | |
| type message struct { | |
| key string // must be defined | |
| ver string // must be defined | |
| size int64 // | |
| } | |
| // we have a bucket, let's list it | |
| bus := make(chan message, 10000) | |
| wg := &sync.WaitGroup{} | |
| wg.Go(func() { | |
| // we run lister | |
| defer close(bus) | |
| cursor := struct { | |
| key *string | |
| ver *string | |
| }{ | |
| key: nil, | |
| ver: nil, | |
| } | |
| rl := rate.NewLimiter(rate.Limit(1000), 1000) // 1000 req/s with burst of 1000 | |
| for { | |
| if !rl.Allow() { | |
| rl.WaitN(context.Background(), 1) | |
| } | |
| input := &s3.ListObjectVersionsInput{ | |
| Bucket: aws.String(bktName), | |
| Prefix: aws.String(bktPrefix), | |
| } | |
| if cursor.key != nil { | |
| input.KeyMarker = cursor.key | |
| } | |
| if cursor.ver != nil { | |
| input.VersionIdMarker = cursor.ver | |
| } | |
| output, err := s3Client.ListObjectVersions(context.Background(), input) | |
| //FIXME: transient errors, retry | |
| if err != nil { | |
| fmt.Printf("error listing object versions: %v\n", err) | |
| break | |
| } | |
| for _, ver := range output.Versions { | |
| // check that noncurrent | |
| if ver.IsLatest == nil { | |
| // warn, but skip | |
| fmt.Printf("version %s of key %s has nil IsLatest field, skipping\n", aws.ToString(ver.VersionId), aws.ToString(ver.Key)) | |
| continue | |
| } | |
| if !*ver.IsLatest { | |
| // not latest, add to bus | |
| // get version id | |
| // backpressure | |
| bus <- message{ | |
| key: *ver.Key, | |
| ver: *ver.VersionId, | |
| size: *ver.Size, | |
| } | |
| } | |
| } | |
| if output.IsTruncated != nil && !*output.IsTruncated { | |
| // no more pages | |
| break | |
| } | |
| // set cursor for next page | |
| cursor.key = output.NextKeyMarker | |
| cursor.ver = output.NextVersionIdMarker | |
| fmt.Printf("getting next page after %s\n", *cursor.key) | |
| } | |
| }) | |
| type stat struct { | |
| listedCount int | |
| removedCount int | |
| removedSize int64 | |
| } | |
| stats := make(chan stat, 1000) | |
| wg.Go(func() { | |
| defer close(stats) | |
| objs := make([]types.ObjectIdentifier, 0, 1000) | |
| deleteBatch := func(sz uint64) { | |
| if !runDelete { | |
| fmt.Println("dry run mode, not actually deleting objects") | |
| stats <- stat{ | |
| listedCount: len(objs), | |
| removedCount: 0, | |
| removedSize: 0, | |
| } | |
| return | |
| } | |
| res, err := s3Client.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ | |
| Bucket: aws.String(bktName), | |
| Delete: &types.Delete{ | |
| Objects: objs, | |
| Quiet: aws.Bool(true), | |
| }, | |
| }) | |
| if err != nil { | |
| fmt.Printf("error deleting objects: %v\n", err) | |
| return | |
| } | |
| if len(res.Errors) > 0 { | |
| fmt.Printf("errors deleting objects: %v\n", res.Errors) | |
| return | |
| } | |
| fmt.Printf("rm: %d objects, size: %.2f Gb\n", len(objs), float64(sz)/1024/1024/1024) | |
| stats <- stat{ | |
| listedCount: len(objs), | |
| removedCount: len(objs), | |
| removedSize: int64(sz), | |
| } | |
| } | |
| sz := uint64(0) | |
| for msg := range bus { | |
| // fmt.Printf("noncurrent version: key=%s version=%s size=%d\n", msg.key, msg.ver, msg.size) | |
| sz += uint64(msg.size) | |
| if len(objs) < 1000 { | |
| objs = append(objs, types.ObjectIdentifier{ | |
| Key: aws.String(msg.key), | |
| VersionId: aws.String(msg.ver), | |
| }) | |
| continue | |
| } | |
| deleteBatch(sz) | |
| sz = 0 | |
| // clear batch | |
| objs = objs[:0] | |
| } | |
| if len(objs) > 0 { | |
| deleteBatch(sz) | |
| } | |
| }) | |
| wg.Go(func() { | |
| // Collect statistics with DeleteObjects per second: | |
| // And print them every 10 seconds | |
| ticker := time.NewTicker(10 * time.Second) | |
| defer ticker.Stop() | |
| totalRemoved := 0 | |
| totalRemovedSize := uint64(0) | |
| prevReported := 0 | |
| prevReportedSize := uint64(0) | |
| prevReportedTime := time.Now() | |
| printStat := func() { | |
| elapsed := time.Since(prevReportedTime).Seconds() | |
| fmt.Printf("total removed: %d/%d (rps: %.2f/s, size: %d bytes, bps : %.2f Gb/s)\n", | |
| totalRemoved, totalRemoved-prevReported, | |
| float64(totalRemoved-prevReported)/elapsed, | |
| totalRemovedSize, | |
| float64(totalRemovedSize-prevReportedSize)/elapsed/1024/1024/1024, | |
| ) | |
| prevReported = totalRemoved | |
| prevReportedSize = totalRemovedSize | |
| prevReportedTime = time.Now() | |
| } | |
| for { | |
| select { | |
| case v, ok := <-stats: | |
| if !ok { | |
| // Print final stats | |
| printStat() | |
| return | |
| } | |
| totalRemoved += v.removedCount | |
| totalRemovedSize += uint64(v.removedSize) | |
| case <-ticker.C: | |
| printStat() | |
| } | |
| } | |
| }) | |
| wg.Wait() | |
| fmt.Println("all done") | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment