Created
June 19, 2017 01:16
-
-
Save homingli/39bf9c0271feae8f2d45d87ace8268b2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"fmt" | |
"strings" | |
"log" | |
"os" | |
"time" | |
"encoding/json" | |
"strconv" | |
"net/url" | |
"io" | |
"bytes" | |
"compress/gzip" | |
"encoding/csv" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/s3" | |
"github.com/aws/aws-sdk-go/service/s3/s3manager" | |
) | |
type File struct { | |
Key string | |
Size int | |
MD5checksum string | |
} | |
type Tracker struct { | |
indexcounter,sizeTotal,objcount,incobjcount int | |
} | |
type FilterParams struct { | |
prefix,storageClass string | |
before,after time.Time | |
} | |
func getfiles_from_manifest(awsSession *session.Session, bucket string, key string) (string, []File) { | |
mbuff := s3dl(awsSession, bucket,key) | |
var objmap map[string]*json.RawMessage | |
err := json.Unmarshal(mbuff.Bytes(), &objmap) | |
if err != nil { log.Fatal(err) } | |
var files []File | |
err = json.Unmarshal(*objmap["files"], &files) | |
var headers string | |
err = json.Unmarshal(*objmap["fileSchema"], &headers) | |
//fmt.Println(strings.Split(headers,",")) | |
//for _,f := range files { fmt.Fprintf(os.Stderr,"% v\n", f) } | |
return headers, files | |
} | |
func process_csv(awsSession *session.Session,bucket string,key string,t *Tracker, buff *bytes.Buffer) { | |
gzbuff := s3dl(awsSession, bucket,key) | |
zr,err := gzip.NewReader(bytes.NewReader(gzbuff.Bytes())) | |
if err != nil { log.Fatal(err) } | |
defer zr.Close() | |
csvr := csv.NewReader(zr) | |
//fmt.Printf("Name: %s\nComment: %s\nModTime: %s\n\n", zr.Name, zr.Comment, zr.ModTime.UTC()) | |
for { | |
line, err := csvr.Read() | |
if err == io.EOF { break } | |
t.objcount+=1 | |
// apply filter | |
if include_or_not(line) { | |
t.incobjcount += 1 | |
buff.WriteString(strings.Join([]string{line[0],line[1],line[2]},",")) //bucket,key,size | |
buff.WriteRune('\n') | |
objsize,err := strconv.Atoi(line[2]) // size | |
if err!=nil { log.Fatal(err) } | |
t.sizeTotal+=objsize | |
if t.sizeTotal > 4*1024*1024*1024 { | |
fmt.Println("threshold crossed. current size "+strconv.Itoa(t.sizeTotal)) | |
_ = s3ul(awsSession, "hml-oregon-bucket", "s3grouper-output/manifest."+strconv.Itoa(t.indexcounter)+".index", aws.NewWriteAtBuffer(buff.Bytes())) | |
t.indexcounter+=1 | |
buff.Reset() | |
t.sizeTotal=0 | |
} | |
} | |
} | |
return | |
} | |
func s3dl(awsSession *session.Session, bucket string, key string) *aws.WriteAtBuffer { | |
fmt.Fprintf(os.Stderr, "attempting to download s3://%v/%v\n", bucket, key) | |
start := time.Now() | |
buff := &aws.WriteAtBuffer{} | |
s3dl := s3manager.NewDownloader(awsSession) | |
n, err := s3dl.Download(buff, &s3.GetObjectInput{ | |
Bucket: aws.String(bucket), | |
Key: aws.String(key), | |
}) | |
if err!=nil { log.Fatal(err) } | |
fmt.Fprintf(os.Stderr, "downloaded %v bytes in %v\n", n, time.Now().Sub(start)) | |
return buff | |
} | |
func s3ul(awsSession *session.Session, bucket string, key string, data *aws.WriteAtBuffer) *s3manager.UploadOutput { | |
fmt.Fprintf(os.Stderr, "attempting to upload s3://%v/%v\n", bucket, key) | |
start := time.Now() | |
s3ul := s3manager.NewUploader(awsSession) | |
result, err := s3ul.Upload(&s3manager.UploadInput{ | |
Bucket: aws.String(bucket), | |
Key: aws.String(key), | |
Body: bytes.NewReader(data.Bytes()), | |
}) | |
if err!=nil { log.Fatal(err) } | |
fmt.Fprintf(os.Stderr, "wrote to s3://%v/%v in %v\n", bucket,key,time.Now().Sub(start)) | |
return result | |
} | |
func include_or_not(record []string) bool { | |
// processing: apply rules to include (or exclude) | |
//if record[5] == "GLACIER" { | |
if record[5] == "STANDARD" { | |
return true | |
} else { return false } | |
} | |
func s3uri(uri *string) (string, string) { | |
if !strings.HasPrefix(*uri, "s3") || !strings.HasSuffix(*uri,"manifest.json") { | |
log.Fatal("ERROR: malformed s3uri. Please use s3://path/to/manifest.json format.") | |
} | |
u, err := url.Parse(*uri); | |
if err != nil { log.Fatal(err) } | |
return u.Host, u.Path | |
} | |
func main() { | |
// cli arguments | |
var outbucket,manifest string | |
var debug bool | |
var source map[string]string | |
source = make(map[string]string) | |
flag.StringVar(&manifest, "m", "", "s3 path to inventory manifest") | |
flag.StringVar(&outbucket, "b", "", "index files output bucket") | |
flag.BoolVar(&debug, "debug", false, "show aws sdk debug output") | |
flag.Parse() | |
if len(manifest) == 0 { | |
flag.Usage() | |
log.Fatal("ERROR: need manfiest defined.") | |
} else { | |
source["bucket"],source["key"] = s3uri(&manifest) | |
} | |
if len(outbucket) == 0 { | |
outbucket="hml-oregon-bucket" | |
} | |
cfg := aws.NewConfig() | |
if debug { | |
log := log.New(os.Stderr, "", log.LstdFlags) | |
cfg = cfg.WithLogger( | |
aws.LoggerFunc(func(args ...interface{}) { log.Println(args) }), | |
).WithLogLevel(aws.LogDebugWithSigning) | |
} | |
awsSession := session.Must(session.NewSessionWithOptions(session.Options{ | |
SharedConfigState: session.SharedConfigEnable, | |
})) | |
// process manifest | |
_, files := getfiles_from_manifest(awsSession, source["bucket"], source["key"]) | |
// process csv | |
var t Tracker | |
var indexbuffer bytes.Buffer | |
t.indexcounter = 1 | |
for _,f := range files { | |
process_csv(awsSession,source["bucket"],f.Key,&t,&indexbuffer) | |
} | |
//upload last batch | |
if !bytes.Equal(indexbuffer.Bytes(),nil) { // isEmpty? | |
fmt.Fprintln(os.Stderr,"last batch, current size "+strconv.Itoa(t.sizeTotal)) | |
_ = s3ul(awsSession, outbucket, "s3grouper-output/manifest."+strconv.Itoa(t.indexcounter)+".index", aws.NewWriteAtBuffer(indexbuffer.Bytes())) | |
} | |
fmt.Fprintf(os.Stderr,"%v objects processed. %v objects included.",t.objcount,t.incobjcount) | |
/* | |
if _, err := io.Copy(os.Stdout, zr); err != nil { | |
log.Fatal(err) | |
} | |
*/ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment