Last active
May 2, 2020 23:49
-
-
Save Integralist/9cf6f2376aa25520a80e191e8925263f to your computer and use it in GitHub Desktop.
[Golang AWS S3 Examples] #go #golang #aws #s3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sessionToken := "" // not required | |
accessKey := "AWS_ACCESS_KEY_ID" | |
secretKey := "AWS_SECRET_ACCESS_KEY" | |
sess, err := session.NewSession(&aws.Config{ | |
Region: aws.String("us-east-1"), | |
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, sessionToken), | |
}) | |
if err != nil { | |
log.Fatal("unable to create aws session") | |
} | |
svc := s3.New(sess) | |
input := &s3.ListObjectsInput{ | |
Bucket: aws.String("some_bucket_name"), | |
MaxKeys: aws.Int64(2), // only return two results! | |
} | |
result, err := svc.ListObjects(input) | |
if err != nil { | |
if aerr, ok := err.(awserr.Error); ok { | |
switch aerr.Code() { | |
case s3.ErrCodeNoSuchBucket: | |
fmt.Println(s3.ErrCodeNoSuchBucket, aerr.Error()) | |
default: | |
fmt.Println(aerr.Error()) | |
} | |
} else { | |
// Print the error, cast err to awserr.Error to get the Code and | |
// Message from an error. | |
fmt.Println(err.Error()) | |
} | |
return | |
} | |
fmt.Println(result) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Original written by Mark Gannaway... | |
https://gist.github.com/Ganners/86f23c2d121332a8b3968bf05d2f720a | |
Dry Runs: | |
# stage | |
go run main.go --bucket=<your_bucket_name> --profile=planz-stage | |
# prod | |
go run main.go --bucket=<your_bucket_name> --profile=planz-prod | |
Real Deletes: | |
# stage | |
go run main.go --bucket=<your_bucket_name> --profile=planz-stage --dryrun=false | |
# prod | |
go run main.go --bucket=<your_bucket_name> --profile=planz-prod --dryrun=false | |
*/ | |
package main | |
import ( | |
"errors" | |
"flag" | |
"fmt" | |
"log" | |
"math/rand" | |
"net/url" | |
"strings" | |
"sync" | |
"time" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/credentials/stscreds" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/s3" | |
) | |
var ( | |
whitelist = map[string]struct{}{ | |
// Used for /wd/UserWidget (which is used for ads) | |
"bids": struct{}{}, | |
"cs": struct{}{}, | |
"ct": struct{}{}, | |
"network": struct{}{}, | |
"or": struct{}{}, | |
"u": struct{}{}, | |
"uo": struct{}{}, | |
"wid": struct{}{}, | |
// Keep | |
"country": struct{}{}, // Used for varying the country, usually used when fetching feeds | |
"p": struct{}{}, // Used for selecting page | |
"page": struct{}{}, // Used for selecting page | |
"page_quantity": struct{}{}, // Used for specifying page size | |
"page_size": struct{}{}, // Used for specifying page size | |
"render_template": struct{}{}, // Used for next pages in, etc. /nifty?render_template=0&page=2 | |
} | |
invalidSuffix = []string{ | |
".mobile.js", | |
".mobile3.js", | |
} | |
invalidPrefix = []string{ | |
"/", | |
"api/comments", | |
} | |
) | |
const ( | |
deleteWorkers = 100 | |
deleteBatchSize = 10 | |
listWorkers = 1000 | |
) | |
func main() { | |
dryrun := true | |
bucket := "" | |
profile := "" | |
{ | |
flag.BoolVar(&dryrun, "dryrun", true, "is this a dry run? false will execute deletes") | |
flag.StringVar(&bucket, "bucket", "plan-z-stage-us-east-1", "what bucket to use") | |
flag.StringVar(&profile, "profile", "planz", "what bucket to use") | |
flag.Parse() | |
} | |
svc := s3.New(session.Must(session.NewSessionWithOptions(session.Options{ | |
AssumeRoleTokenProvider: stscreds.StdinTokenProvider, | |
Config: aws.Config{Region: aws.String("us-east-1")}, | |
Profile: profile, | |
}))) | |
prefixes := []string{"api/comments/stats", "/api/comments/stats", "?"} | |
// build the prefix list, this is geared specifically towards speeding up | |
// the plan z deletions | |
for c1 := '0'; c1 <= '9'; c1++ { | |
for c2 := '0'; c2 <= '9'; c2++ { | |
for c3 := '0'; c3 <= '9'; c3++ { | |
// it appears we can have those starting with a slash and not | |
prefixes = append(prefixes, "api/comments/"+string(c1)+string(c2)+string(c3)) | |
prefixes = append(prefixes, "/api/comments/"+string(c1)+string(c2)+string(c3)) | |
} | |
} | |
} | |
for c1 := '!'; c1 <= '~'; c1++ { | |
for c2 := '!'; c2 <= '~'; c2++ { | |
for c3 := '!'; c3 <= '~'; c3++ { | |
prefix := string(c1) + string(c2) + string(c3) | |
if prefix == "api" { | |
continue | |
} | |
// it appears we can have those starting with a slash and not | |
prefixes = append(prefixes, prefix) | |
} | |
} | |
} | |
// shuffle keys | |
for i := range prefixes { | |
j := rand.Intn(i + 1) | |
prefixes[i], prefixes[j] = prefixes[j], prefixes[i] | |
} | |
// file list workers | |
outputChan := NewListFilesWorkers(svc, bucket, prefixes, listWorkers) | |
if dryrun { | |
for output := range outputChan { | |
// dryrun just prints | |
fmt.Println(output) | |
} | |
} else { | |
// file delete workers | |
<-NewDeleteFilesWorkers(svc, bucket, outputChan, deleteWorkers) | |
} | |
} | |
// DeleteFilesWorkers handles multiple workers to delete files | |
type DeleteFilesWorkers struct { | |
svc *s3.S3 | |
bucket string | |
keysChan chan string | |
} | |
// NewDeleteFilesWorkers will spawn and start a number of workers to handle | |
// deletion, will output to the returned channel when complete | |
func NewDeleteFilesWorkers(svc *s3.S3, bucket string, keysChan chan string, numWorkers int) chan struct{} { | |
dfw := &DeleteFilesWorkers{ | |
svc: svc, | |
bucket: bucket, | |
keysChan: keysChan, | |
} | |
return dfw.Start(numWorkers) | |
} | |
// del will handle the actual deletion request to S3, retries up to 5 times | |
// with jittered exponential backoff | |
func (dfw *DeleteFilesWorkers) del(objects []*s3.ObjectIdentifier) error { | |
if len(objects) == 0 { | |
return nil | |
} | |
for attempt := 0; attempt < 5; attempt++ { | |
if attempt > 0 { | |
sleepJitter := time.Duration(rand.Intn(30)) | |
sleepSeconds := sleepJitter + time.Duration(attempt*attempt) | |
time.Sleep(sleepSeconds * time.Second) | |
} | |
_, err := dfw.svc.DeleteObjects(&s3.DeleteObjectsInput{ | |
Bucket: aws.String(dfw.bucket), | |
Delete: &s3.Delete{ | |
Objects: objects, | |
}, | |
}) | |
if err == nil { | |
return nil | |
} | |
log.Println("delete error", err) | |
} | |
return errors.New("unable to delete") | |
} | |
// Start will start a number of workers to handle file batch deletion | |
func (dfw *DeleteFilesWorkers) Start(workers int) chan struct{} { | |
doneCh := make(chan struct{}) | |
go func() { | |
wg := sync.WaitGroup{} | |
for i := 0; i < workers; i++ { | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
objects := make([]*s3.ObjectIdentifier, 0, deleteBatchSize) | |
for key := range dfw.keysChan { | |
objects = append(objects, &s3.ObjectIdentifier{ | |
Key: aws.String(key), | |
}) | |
if len(objects) < deleteBatchSize { | |
continue | |
} | |
err := dfw.del(objects) | |
if err == nil { | |
log.Println("successfully deleted", len(objects), "items") | |
objects = make([]*s3.ObjectIdentifier, 0, deleteBatchSize) | |
} | |
} | |
// if chan has been closed, trigger final delete | |
dfw.del(objects) | |
}() | |
} | |
wg.Wait() | |
doneCh <- struct{}{} | |
}() | |
return doneCh | |
} | |
// ListFilesWorkers will start a number of workers to list files matching the | |
// pattern and then sending them to the outputChan | |
type ListFilesWorkers struct { | |
svc *s3.S3 | |
bucket string | |
prefixes []string | |
outputChan chan string | |
} | |
// NewListFilesWorkers will spawn a number of workers to iterate over the | |
// prefixes to divide up the work | |
func NewListFilesWorkers(svc *s3.S3, bucket string, prefixes []string, numWorkers int) chan string { | |
lfw := &ListFilesWorkers{ | |
svc: svc, | |
bucket: bucket, | |
prefixes: prefixes, | |
outputChan: make(chan string), | |
} | |
go lfw.Start(numWorkers) | |
return lfw.outputChan | |
} | |
// listObjects will handle looping and checking the contents of each key | |
func (lfw *ListFilesWorkers) listObjects(p *s3.ListObjectsOutput, last bool) bool { | |
for _, obj := range p.Contents { | |
if obj == nil { | |
continue | |
} | |
// check if it is valid | |
url, err := url.Parse(strings.Replace(*obj.Key, "%3F", "?", -1)) | |
if err != nil { | |
// if there was an error, assume it's fine | |
continue | |
} | |
invalidParams := []string{} | |
query := url.Query() | |
for _, suffix := range invalidSuffix { | |
if strings.HasSuffix(*obj.Key, suffix) { | |
goto delete | |
} | |
} | |
for _, prefix := range invalidPrefix { | |
if strings.HasPrefix(*obj.Key, prefix) { | |
goto delete | |
} | |
} | |
for key := range query { | |
if _, ok := whitelist[key]; !ok { | |
invalidParams = append(invalidParams, key) | |
} | |
} | |
if len(invalidParams) == 0 { | |
continue | |
} | |
delete: | |
lfw.outputChan <- *obj.Key | |
} | |
return true | |
} | |
// Start will commence a the workers, should be called in a goroutine. Will | |
// close up the outputChan when it is finished | |
func (lfw *ListFilesWorkers) Start(workers int) { | |
numPrefixes := len(lfw.prefixes) | |
if numPrefixes < workers { | |
workers = numPrefixes | |
} | |
splitPrefixes := [][]string{} | |
for i, j := 0, 0; i <= numPrefixes; j, i = i, (i + numPrefixes/workers) { | |
if i == 0 { | |
continue | |
} | |
splitPrefixes = append(splitPrefixes, lfw.prefixes[j:i]) | |
} | |
wg := &sync.WaitGroup{} | |
wg.Add(len(splitPrefixes)) | |
for _, chunk := range splitPrefixes { | |
go func(chunk []string) { | |
defer wg.Done() | |
for _, prefix := range chunk { | |
err := lfw.svc.ListObjectsPages( | |
&s3.ListObjectsInput{ | |
Bucket: aws.String(lfw.bucket), | |
Prefix: aws.String(prefix), | |
MaxKeys: aws.Int64(1000), | |
}, | |
lfw.listObjects, | |
) | |
if err != nil { | |
fmt.Println("failed to list objects", err) | |
} | |
} | |
}(chunk) | |
} | |
wg.Wait() | |
close(lfw.outputChan) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How do you catch errors, and how do you know that the delete was successful?