Skip to content

Instantly share code, notes, and snippets.

@Integralist
Last active May 2, 2020 23:49
Show Gist options
  • Save Integralist/9cf6f2376aa25520a80e191e8925263f to your computer and use it in GitHub Desktop.
Save Integralist/9cf6f2376aa25520a80e191e8925263f to your computer and use it in GitHub Desktop.
[Golang AWS S3 Examples] #go #golang #aws #s3
sessionToken := "" // not required
accessKey := "AWS_ACCESS_KEY_ID"
secretKey := "AWS_SECRET_ACCESS_KEY"
sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-east-1"),
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, sessionToken),
})
if err != nil {
log.Fatal("unable to create aws session")
}
svc := s3.New(sess)
input := &s3.ListObjectsInput{
Bucket: aws.String("some_bucket_name"),
MaxKeys: aws.Int64(2), // only return two results!
}
result, err := svc.ListObjects(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchBucket:
fmt.Println(s3.ErrCodeNoSuchBucket, aerr.Error())
default:
fmt.Println(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
fmt.Println(err.Error())
}
return
}
fmt.Println(result)
/*
Original written by Mark Gannaway...
https://gist.github.com/Ganners/86f23c2d121332a8b3968bf05d2f720a
Dry Runs:
# stage
go run main.go --bucket=<your_bucket_name> --profile=planz-stage
# prod
go run main.go --bucket=<your_bucket_name> --profile=planz-prod
Real Deletes:
# stage
go run main.go --bucket=<your_bucket_name> --profile=planz-stage --dryrun=false
# prod
go run main.go --bucket=<your_bucket_name> --profile=planz-prod --dryrun=false
*/
package main
import (
"errors"
"flag"
"fmt"
"log"
"math/rand"
"net/url"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
var (
whitelist = map[string]struct{}{
// Used for /wd/UserWidget (which is used for ads)
"bids": struct{}{},
"cs": struct{}{},
"ct": struct{}{},
"network": struct{}{},
"or": struct{}{},
"u": struct{}{},
"uo": struct{}{},
"wid": struct{}{},
// Keep
"country": struct{}{}, // Used for varying the country, usually used when fetching feeds
"p": struct{}{}, // Used for selecting page
"page": struct{}{}, // Used for selecting page
"page_quantity": struct{}{}, // Used for specifying page size
"page_size": struct{}{}, // Used for specifying page size
"render_template": struct{}{}, // Used for next pages in, etc. /nifty?render_template=0&page=2
}
invalidSuffix = []string{
".mobile.js",
".mobile3.js",
}
invalidPrefix = []string{
"/",
"api/comments",
}
)
const (
deleteWorkers = 100
deleteBatchSize = 10
listWorkers = 1000
)
func main() {
dryrun := true
bucket := ""
profile := ""
{
flag.BoolVar(&dryrun, "dryrun", true, "is this a dry run? false will execute deletes")
flag.StringVar(&bucket, "bucket", "plan-z-stage-us-east-1", "what bucket to use")
flag.StringVar(&profile, "profile", "planz", "what bucket to use")
flag.Parse()
}
svc := s3.New(session.Must(session.NewSessionWithOptions(session.Options{
AssumeRoleTokenProvider: stscreds.StdinTokenProvider,
Config: aws.Config{Region: aws.String("us-east-1")},
Profile: profile,
})))
prefixes := []string{"api/comments/stats", "/api/comments/stats", "?"}
// build the prefix list, this is geared specifically towards speeding up
// the plan z deletions
for c1 := '0'; c1 <= '9'; c1++ {
for c2 := '0'; c2 <= '9'; c2++ {
for c3 := '0'; c3 <= '9'; c3++ {
// it appears we can have those starting with a slash and not
prefixes = append(prefixes, "api/comments/"+string(c1)+string(c2)+string(c3))
prefixes = append(prefixes, "/api/comments/"+string(c1)+string(c2)+string(c3))
}
}
}
for c1 := '!'; c1 <= '~'; c1++ {
for c2 := '!'; c2 <= '~'; c2++ {
for c3 := '!'; c3 <= '~'; c3++ {
prefix := string(c1) + string(c2) + string(c3)
if prefix == "api" {
continue
}
// it appears we can have those starting with a slash and not
prefixes = append(prefixes, prefix)
}
}
}
// shuffle keys
for i := range prefixes {
j := rand.Intn(i + 1)
prefixes[i], prefixes[j] = prefixes[j], prefixes[i]
}
// file list workers
outputChan := NewListFilesWorkers(svc, bucket, prefixes, listWorkers)
if dryrun {
for output := range outputChan {
// dryrun just prints
fmt.Println(output)
}
} else {
// file delete workers
<-NewDeleteFilesWorkers(svc, bucket, outputChan, deleteWorkers)
}
}
// DeleteFilesWorkers handles multiple workers to delete files
type DeleteFilesWorkers struct {
svc *s3.S3
bucket string
keysChan chan string
}
// NewDeleteFilesWorkers will spawn and start a number of workers to handle
// deletion, will output to the returned channel when complete
func NewDeleteFilesWorkers(svc *s3.S3, bucket string, keysChan chan string, numWorkers int) chan struct{} {
dfw := &DeleteFilesWorkers{
svc: svc,
bucket: bucket,
keysChan: keysChan,
}
return dfw.Start(numWorkers)
}
// del will handle the actual deletion request to S3, retries up to 5 times
// with jittered exponential backoff
func (dfw *DeleteFilesWorkers) del(objects []*s3.ObjectIdentifier) error {
if len(objects) == 0 {
return nil
}
for attempt := 0; attempt < 5; attempt++ {
if attempt > 0 {
sleepJitter := time.Duration(rand.Intn(30))
sleepSeconds := sleepJitter + time.Duration(attempt*attempt)
time.Sleep(sleepSeconds * time.Second)
}
_, err := dfw.svc.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(dfw.bucket),
Delete: &s3.Delete{
Objects: objects,
},
})
if err == nil {
return nil
}
log.Println("delete error", err)
}
return errors.New("unable to delete")
}
// Start will start a number of workers to handle file batch deletion
func (dfw *DeleteFilesWorkers) Start(workers int) chan struct{} {
doneCh := make(chan struct{})
go func() {
wg := sync.WaitGroup{}
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
objects := make([]*s3.ObjectIdentifier, 0, deleteBatchSize)
for key := range dfw.keysChan {
objects = append(objects, &s3.ObjectIdentifier{
Key: aws.String(key),
})
if len(objects) < deleteBatchSize {
continue
}
err := dfw.del(objects)
if err == nil {
log.Println("successfully deleted", len(objects), "items")
objects = make([]*s3.ObjectIdentifier, 0, deleteBatchSize)
}
}
// if chan has been closed, trigger final delete
dfw.del(objects)
}()
}
wg.Wait()
doneCh <- struct{}{}
}()
return doneCh
}
// ListFilesWorkers will start a number of workers to list files matching the
// pattern and then sending them to the outputChan
type ListFilesWorkers struct {
svc *s3.S3
bucket string
prefixes []string
outputChan chan string
}
// NewListFilesWorkers will spawn a number of workers to iterate over the
// prefixes to divide up the work
func NewListFilesWorkers(svc *s3.S3, bucket string, prefixes []string, numWorkers int) chan string {
lfw := &ListFilesWorkers{
svc: svc,
bucket: bucket,
prefixes: prefixes,
outputChan: make(chan string),
}
go lfw.Start(numWorkers)
return lfw.outputChan
}
// listObjects will handle looping and checking the contents of each key
func (lfw *ListFilesWorkers) listObjects(p *s3.ListObjectsOutput, last bool) bool {
for _, obj := range p.Contents {
if obj == nil {
continue
}
// check if it is valid
url, err := url.Parse(strings.Replace(*obj.Key, "%3F", "?", -1))
if err != nil {
// if there was an error, assume it's fine
continue
}
invalidParams := []string{}
query := url.Query()
for _, suffix := range invalidSuffix {
if strings.HasSuffix(*obj.Key, suffix) {
goto delete
}
}
for _, prefix := range invalidPrefix {
if strings.HasPrefix(*obj.Key, prefix) {
goto delete
}
}
for key := range query {
if _, ok := whitelist[key]; !ok {
invalidParams = append(invalidParams, key)
}
}
if len(invalidParams) == 0 {
continue
}
delete:
lfw.outputChan <- *obj.Key
}
return true
}
// Start will commence a the workers, should be called in a goroutine. Will
// close up the outputChan when it is finished
func (lfw *ListFilesWorkers) Start(workers int) {
numPrefixes := len(lfw.prefixes)
if numPrefixes < workers {
workers = numPrefixes
}
splitPrefixes := [][]string{}
for i, j := 0, 0; i <= numPrefixes; j, i = i, (i + numPrefixes/workers) {
if i == 0 {
continue
}
splitPrefixes = append(splitPrefixes, lfw.prefixes[j:i])
}
wg := &sync.WaitGroup{}
wg.Add(len(splitPrefixes))
for _, chunk := range splitPrefixes {
go func(chunk []string) {
defer wg.Done()
for _, prefix := range chunk {
err := lfw.svc.ListObjectsPages(
&s3.ListObjectsInput{
Bucket: aws.String(lfw.bucket),
Prefix: aws.String(prefix),
MaxKeys: aws.Int64(1000),
},
lfw.listObjects,
)
if err != nil {
fmt.Println("failed to list objects", err)
}
}
}(chunk)
}
wg.Wait()
close(lfw.outputChan)
}
@puppeteer701vungle
Copy link

How do you catch errors, and how do you know that the delete was successful?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment