Last active
March 2, 2016 16:40
-
-
Save djui/05bc8720a4b3cfa1059b to your computer and use it in GitHub Desktop.
Takes a list of S3 URLs and applies a function on each file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"fmt" | |
"io/ioutil" | |
"net/url" | |
"os" | |
"strings" | |
"sync" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/s3" | |
"github.com/aws/aws-sdk-go/service/s3/s3manager" | |
) | |
const concurrency = 64 | |
func ofInterest(f *os.File) bool { | |
c, err := ioutil.ReadAll(f) | |
if err != nil { | |
fmt.Fprintln(os.Stderr, "Failed to read file:", err) | |
return false | |
} | |
return strings.Contains(string(c), "foo") | |
} | |
func main() { | |
if len(os.Args) != 2 { | |
fmt.Fprintln(os.Stderr, "Usage: foreach FILELIST") | |
} | |
filelist := os.Args[1] | |
r, err := os.Open(filelist) | |
if err != nil { | |
fmt.Fprintln(os.Stderr, err) | |
} | |
s := session.New(&aws.Config{Region: aws.String("eu-west-1")}) | |
downloader := s3manager.NewDownloader(s) | |
// Concurrency | |
sem := make(chan bool, concurrency) | |
wg := sync.WaitGroup{} | |
scanner := bufio.NewScanner(r) | |
for scanner.Scan() { | |
fmt.Fprint(os.Stderr, ".") | |
line := scanner.Text() | |
// Concurrency | |
wg.Add(1) | |
go func(line string) { | |
// Concurrency | |
sem <- true | |
defer func() { | |
<-sem | |
wg.Done() | |
}() | |
t, err := ioutil.TempFile("", "foreach_") | |
if err != nil { | |
fmt.Fprintln(os.Stderr, "Failed to create tempfile", err) | |
return | |
} | |
defer func() { n := t.Name(); t.Close(); os.Remove(n) }() | |
bucket, key, err := bucketKeyFromURL(line) | |
i := &s3.GetObjectInput{Bucket: aws.String(bucket), Key: aws.String(key)} | |
// fmt.Fprintln(os.Stderr, "Downloading", bucket, key, "to", t.Name()) | |
_, err = downloader.Download(t, i) | |
if err != nil { | |
if strings.HasPrefix(err.Error(), "NoSuchKey") { | |
fmt.Fprint(os.Stderr, ":") | |
} else if strings.HasPrefix(err.Error(), "InvalidParameter") { | |
fmt.Fprint(os.Stderr, ";") | |
} else { | |
fmt.Fprintln(os.Stderr, "Failed to download file", err) | |
} | |
return | |
} | |
_, err = t.Seek(0, 0) | |
if err != nil { | |
fmt.Fprintln(os.Stderr, "Failed to rewind tempfile:", err) | |
return | |
} | |
if ofInterest(t) { | |
fmt.Println(line) | |
} | |
}(line) | |
} | |
wg.Wait() | |
if err := scanner.Err(); err != nil { | |
fmt.Fprintln(os.Stderr, err) | |
} | |
} | |
func bucketKeyFromURL(urlString string) (string, string, error) { | |
u, err := url.Parse(urlString) | |
if err != nil { | |
return "", "", err | |
} | |
return u.Host, u.Path, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment