Last active
August 24, 2021 16:04
-
-
Save pcolazurdo/50b9f5bbc5384957bfba7bd4f6177e38 to your computer and use it in GitHub Desktop.
AWS_SDK_BUGREPORT1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This sample code triggers the problem if run against a large directory structure (+50K objects) | |
// I added some instrumention to check how many connections were being resued versus created new | |
// Please change the table name in line 283 and path in line 286 | |
package main | |
import ( | |
"context" | |
"crypto/md5" | |
"strings" | |
"io" | |
"io/fs" | |
"os" | |
"path/filepath" | |
"sync" | |
"syscall" | |
"errors" | |
"fmt" | |
"time" | |
"net" | |
"net/http" | |
"net/http/httptrace" | |
"golang.org/x/net/http2" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/request" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/dynamodb" | |
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" | |
"log" | |
) | |
type RequestTrace struct { | |
} | |
type HTTPTrace struct { | |
context.Context | |
Reused bool | |
} | |
func (t *HTTPTrace) gotConn(info httptrace.GotConnInfo) { | |
fmt.Printf("Connection reused for? %+v %v\n", info.Conn.RemoteAddr(), info.Reused) //, info.Reused) | |
t.Reused = info.Reused | |
} | |
func NewHTTPTrace(ctx context.Context) *HTTPTrace { | |
t := &HTTPTrace{} | |
trace := &httptrace.ClientTrace{ | |
GotConn: t.gotConn, | |
} | |
t.Context = httptrace.WithClientTrace(ctx, trace) | |
return t | |
} | |
func (t *RequestTrace) TraceRequest(r *request.Request) { | |
// Ensure that the http trace added to the request always uses the original | |
// context instead of each following attempt's context to prevent conflict | |
// with previous http traces used. | |
origContext := r.Context() | |
// Send | |
r.Handlers.Send.PushFront(func(rr *request.Request) { | |
aHTTPTrace := NewHTTPTrace(origContext) | |
rr.SetContext(aHTTPTrace) | |
}) | |
} | |
func (t *RequestTrace) String() string { | |
var w strings.Builder | |
// l := t.Latency() | |
// writeDurField(&w, "Latency", l.Latency) | |
// writeDurField(&w, "Validate", l.Validate) | |
// writeDurField(&w, "Build", l.Build) | |
// writeField(&w, "Attempts", "%d", len(t.Attempts)) | |
// for i, a := range t.Attempts { | |
// fmt.Fprintf(&w, "\n\tAttempt: %d, %s", i, a) | |
// } | |
return w.String() | |
} | |
type workerCounter struct { | |
sync.RWMutex | |
counter int | |
} | |
type RunInventoryVars struct { | |
PathName string | |
// name string | |
// envName string | |
// imageTag string | |
// resourceTags map[string]string | |
} | |
// Job for worker | |
type workerJob struct { | |
Root string | |
} | |
// Result of a worker | |
type workerResult struct { | |
Filename string | |
} | |
type HTTPClientSettings struct { | |
Connect time.Duration | |
ConnKeepAlive time.Duration | |
ExpectContinue time.Duration | |
IdleConn time.Duration | |
MaxAllIdleConns int | |
MaxHostIdleConns int | |
ResponseHeader time.Duration | |
TLSHandshake time.Duration | |
} | |
type Database struct { | |
// sync.RWMutex | |
sess *session.Session | |
svc *dynamodb.DynamoDB | |
table string | |
ctx context.Context | |
} | |
type ddbEntry struct { | |
FileName string `dynamodbav:"filename"` | |
Pk string `dynamodbav:"pk"` | |
Sk string `dynamodbav:"sk"` | |
} | |
type Output struct { | |
Count int64 | |
Items []ddbEntry | |
} | |
// transport is an http.RoundTripper that keeps track of the in-flight | |
// request and implements hooks to report HTTP tracing events. | |
// type transport struct { | |
// current *http.Request | |
// } | |
// // GotConn prints whether the connection has been used previously | |
// // for the current request. | |
// func (t *transport) GotConn(info httptrace.GotConnInfo) { | |
// // fmt.Printf("Connection reused for %#v? %#v\n", t.current.URL, info.Reused) | |
// } | |
func NewHTTPClientWithSettings(ctxParent context.Context, httpSettings HTTPClientSettings) (*http.Client, context.Context, error) { | |
var client http.Client | |
// t := &transport{} | |
tr := &http.Transport{ | |
ResponseHeaderTimeout: httpSettings.ResponseHeader, | |
Proxy: http.ProxyFromEnvironment, | |
DialContext: (&net.Dialer{ | |
KeepAlive: httpSettings.ConnKeepAlive, | |
DualStack: true, | |
Timeout: httpSettings.Connect, | |
}).DialContext, | |
MaxIdleConns: httpSettings.MaxAllIdleConns, | |
IdleConnTimeout: httpSettings.IdleConn, | |
TLSHandshakeTimeout: httpSettings.TLSHandshake, | |
MaxIdleConnsPerHost: httpSettings.MaxHostIdleConns, | |
ExpectContinueTimeout: httpSettings.ExpectContinue, | |
} | |
// trace := &httptrace.ClientTrace{ | |
// GotConn: t.GotConn, | |
// } | |
// ctx := httptrace.WithClientTrace(ctxParent, trace) | |
// So client makes HTTP/2 requests | |
err := http2.ConfigureTransport(tr) | |
if err != nil { | |
fmt.Printf("Error Configuring HTTP/2") | |
return &client, nil, err | |
} | |
return &http.Client{ | |
Transport: tr, | |
}, ctxParent, nil | |
} | |
func NewDatabaseConnection(table string) *Database { | |
// Initialize a session that the SDK will use to load | |
// credentials from the shared credentials file ~/.aws/credentials | |
// and region from the shared configuration file ~/.aws/config. | |
ctxParent := context.Background() | |
httpClient, ctx, err := NewHTTPClientWithSettings(ctxParent, HTTPClientSettings{ | |
Connect: 5 * time.Second, | |
ExpectContinue: 1 * time.Second, | |
IdleConn: 90 * time.Second, | |
ConnKeepAlive: 30 * time.Second, | |
MaxAllIdleConns: 10, // To avoid issues with EMFILE errors when too many Idle connections are kept in MacOS | |
MaxHostIdleConns: 2, | |
ResponseHeader: 5 * time.Second, | |
TLSHandshake: 5 * time.Second, | |
}) | |
if err != nil { | |
fmt.Println("Got an error creating custom HTTP client:") | |
fmt.Println(err) | |
panic("Got an error creating custom HTTP client:") | |
} | |
sess := session.Must(session.NewSessionWithOptions(session.Options{ | |
Config: aws.Config{ | |
HTTPClient: httpClient, | |
}, | |
SharedConfigState: session.SharedConfigEnable, | |
})) | |
// Create DynamoDB client | |
svc := dynamodb.New(sess) | |
// svc.Handlers.Build.PushFront(func(r *request.Request) { | |
// obs.Counter("NewRequest", 1) | |
// }) | |
return &Database{ | |
sess: sess, | |
svc: svc, | |
table: table, | |
ctx: ctx, | |
} | |
} | |
func (d *Database) saveItem(item interface{}) error { | |
trace := &RequestTrace{} | |
if d.table != "" { | |
av, err := dynamodbattribute.MarshalMap(item) | |
if err != nil { | |
log.Fatalf("Got error marshalling new item: %s", err) | |
} | |
tableName := d.table | |
input := &dynamodb.PutItemInput{ | |
Item: av, | |
TableName: aws.String(tableName), | |
} | |
// d.Lock() | |
// defer d.Unlock() | |
fmt.Printf("\nSaveFile\n") | |
_, err = d.svc.PutItemWithContext(d.ctx, input, trace.TraceRequest) | |
if err != nil { | |
log.Fatalf("Got error calling PutItem: %s", err) | |
} | |
return err | |
} else { | |
return nil | |
} | |
} | |
//(map[int]*ddbEntry, error) | |
func main() { | |
RunInventory() | |
} | |
const maxCalc = 50 // TODO: Make it configurable | |
func RunInventory() { | |
var startTime = time.Now() | |
var db = NewDatabaseConnection("table-name") | |
// var path = "/Users/pabcol/WorkDocs/AMZN/" | |
var path = "../.." | |
// cwd, err := os.Getwd() | |
dir := path | |
workerCount := 1 | |
jobs := make(chan workerJob, workerCount) | |
results := make(chan workerResult) | |
readDone := make(chan bool) | |
calcDone := make(chan bool) | |
wg := &sync.WaitGroup{} | |
// start N workers | |
for i := 0; i < workerCount; i++ { | |
go treeWalker(jobs, results, wg) | |
} | |
// One initial job | |
wg.Add(1) | |
go func() { | |
jobs <- workerJob{ | |
Root: dir, | |
} | |
}() | |
for i := 0; i < workerCount; i++ { | |
go workerCalc(results, calcDone, startTime.String(), db) | |
} | |
// When all jobs finished, shutdown the system. | |
go func() { | |
wg.Wait() | |
readDone <- true | |
}() | |
readloop: | |
for { | |
select { | |
case <-readDone: | |
// Finished traversing path | |
fmt.Printf("readDone: %v\n", time.Now().Format("2006-01-02T15:04:05Z07:00")) | |
time.Sleep(30 * time.Second) | |
close(jobs) | |
close(results) | |
case <-calcDone: | |
// All existing files have been processed by calc_md5 | |
fmt.Printf("calcDone: %v\n", time.Now().Format("2006-01-02T15:04:05Z07:00")) | |
break readloop | |
} | |
} | |
} | |
// consumer of the jobs channel | |
// producer of workerResult | |
func treeWalker(jobs chan workerJob, results chan<- workerResult, wg *sync.WaitGroup) { | |
// While there are new jobs | |
for j := range jobs { | |
dir, err := os.Open(j.Root) | |
if err != nil { | |
handleFileOpenLimits(err) | |
dir.Close() | |
wg.Done() | |
continue | |
} | |
fInfo, err := dir.Readdir(-1) // Return all files in directory | |
dir.Close() | |
if err != nil { | |
handleFileOpenLimits(err) | |
wg.Done() | |
// if os.IsPermission(err) { | |
// // Skip if there's no permission | |
// continue | |
// } | |
// For now, skip if there is an error | |
continue | |
} | |
for _, file := range fInfo { | |
fmt.Printf("\ntreeWalker\n") | |
fpath, _ := filepath.Abs(filepath.Join(dir.Name(), file.Name())) | |
if file.Mode().IsRegular() { | |
// is file | |
fs := uint64(file.Size()) | |
if fs == 0 { | |
// Skip zero sized | |
// wg.Done() // Not needed because if it is a file there is not a new job | |
continue | |
} | |
r := workerResult{ | |
Filename: fpath, | |
} | |
results <- r | |
} else if file.IsDir() { | |
// Send directory to be processed by the worker | |
nj := workerJob{ | |
Root: fpath, | |
} | |
// One more job, adds to wg | |
wg.Add(1) | |
// Do not block when sending jobs | |
go func() { | |
jobs <- nj | |
}() | |
} | |
} | |
// Done one job, let wg know. | |
wg.Done() | |
} | |
} | |
func openFile(filename string) string { | |
fmt.Printf("\nProcessed\n") | |
file, err := os.Open(filename) | |
if err != nil { | |
handleFileOpenLimits(err) | |
file.Close() | |
return "" | |
// panic(err) | |
} | |
// defer file.Close() | |
hash := md5.New() | |
_, err = io.Copy(hash, file) | |
if err != nil { | |
handleFileOpenLimits(err) | |
file.Close() | |
return "" | |
// panic(err) | |
} | |
file.Close() | |
return filename | |
} | |
func workerCalc(results chan workerResult, done chan bool, timestamp string, database *Database) { | |
wgCalc := &sync.WaitGroup{} | |
i := workerCounter{ | |
counter: 0, | |
} | |
wgCalc.Add(1) | |
go func() { | |
wgCalc.Wait() | |
done <- true | |
}() | |
for j := range results { | |
wgCalc.Add(1) | |
i.Lock() | |
i.counter++ | |
i.Unlock() | |
for func() int { | |
defer i.RUnlock() | |
i.RLock() | |
return i.counter | |
}() > maxCalc { | |
time.Sleep(100 * time.Millisecond) | |
} | |
go func(filename string) { | |
res := openFile(filename) | |
if res != "" { | |
database.PutContentHash(res, timestamp) | |
} | |
i.Lock() | |
i.counter-- | |
i.Unlock() | |
wgCalc.Done() | |
}(j.Filename) | |
} | |
wgCalc.Done() | |
} | |
// This will detect if the io errors are related with too many open files | |
// and suggest the user to reduce the max limit of goroutines | |
// Or to increase the corresponding ulimits | |
func handleFileOpenLimits(err error) { | |
var perr *fs.PathError | |
if errors.As(err, &perr) { | |
if perr.Unwrap() == syscall.EMFILE { | |
// log.Printf("You are running %d goroutines & have opened more files than what is allowed in your ulimits, please check", runtime.NumGoroutine()) | |
fmt.Printf("\nEMFILE Error\n") | |
} else { | |
log.Printf("%#v", perr) | |
} | |
} | |
} | |
func (d *Database) PutContentHash(filename string, timestamp string) error { | |
var x = &ddbEntry{ | |
FileName: filename, | |
Pk: filename, | |
Sk: timestamp, | |
} | |
err := d.saveItem(x) | |
if err != nil { | |
log.Fatalf("Got error calling PutContentHash: %s", err) | |
} | |
return err | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"crypto/md5" | |
"io" | |
"io/fs" | |
"os" | |
"path/filepath" | |
"runtime" | |
"sync" | |
"syscall" | |
"errors" | |
"fmt" | |
"time" | |
"net" | |
"net/http" | |
"golang.org/x/net/http2" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/dynamodb" | |
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" | |
"log" | |
) | |
type workerCounter struct { | |
sync.RWMutex | |
counter int | |
} | |
type RunInventoryVars struct { | |
PathName string | |
// name string | |
// envName string | |
// imageTag string | |
// resourceTags map[string]string | |
} | |
// Job for worker | |
type workerJob struct { | |
Root string | |
} | |
// Result of a worker | |
type workerResult struct { | |
Filename string | |
} | |
type HTTPClientSettings struct { | |
Connect time.Duration | |
ConnKeepAlive time.Duration | |
ExpectContinue time.Duration | |
IdleConn time.Duration | |
MaxAllIdleConns int | |
MaxHostIdleConns int | |
ResponseHeader time.Duration | |
TLSHandshake time.Duration | |
} | |
type Database struct { | |
// sync.RWMutex | |
sess *session.Session | |
svc *dynamodb.DynamoDB | |
table string | |
} | |
type ddbEntry struct { | |
FileName string `dynamodbav:"filename"` | |
Pk string `dynamodbav:"pk"` | |
Sk string `dynamodbav:"sk"` | |
} | |
type Output struct { | |
Count int64 | |
Items []ddbEntry | |
} | |
func NewHTTPClientWithSettings(httpSettings HTTPClientSettings) (*http.Client, error) { | |
var client http.Client | |
tr := &http.Transport{ | |
ResponseHeaderTimeout: httpSettings.ResponseHeader, | |
Proxy: http.ProxyFromEnvironment, | |
DialContext: (&net.Dialer{ | |
KeepAlive: httpSettings.ConnKeepAlive, | |
DualStack: true, | |
Timeout: httpSettings.Connect, | |
}).DialContext, | |
MaxIdleConns: httpSettings.MaxAllIdleConns, | |
IdleConnTimeout: httpSettings.IdleConn, | |
TLSHandshakeTimeout: httpSettings.TLSHandshake, | |
MaxIdleConnsPerHost: httpSettings.MaxHostIdleConns, | |
ExpectContinueTimeout: httpSettings.ExpectContinue, | |
} | |
// So client makes HTTP/2 requests | |
err := http2.ConfigureTransport(tr) | |
if err != nil { | |
return &client, err | |
} | |
return &http.Client{ | |
Transport: tr, | |
}, nil | |
} | |
func NewDatabaseConnection(table string) *Database { | |
// Initialize a session that the SDK will use to load | |
// credentials from the shared credentials file ~/.aws/credentials | |
// and region from the shared configuration file ~/.aws/config. | |
httpClient, err := NewHTTPClientWithSettings(HTTPClientSettings{ | |
Connect: 5 * time.Second, | |
ExpectContinue: 1 * time.Second, | |
IdleConn: 90 * time.Second, | |
ConnKeepAlive: 30 * time.Second, | |
MaxAllIdleConns: 10, // To avoid issues with EMFILE errors when too many Idle connections are kept in MacOS | |
MaxHostIdleConns: 2, | |
ResponseHeader: 5 * time.Second, | |
TLSHandshake: 5 * time.Second, | |
}) | |
if err != nil { | |
fmt.Println("Got an error creating custom HTTP client:") | |
fmt.Println(err) | |
panic("Got an error creating custom HTTP client:") | |
} | |
sess := session.Must(session.NewSessionWithOptions(session.Options{ | |
Config: aws.Config{ | |
HTTPClient: httpClient, | |
}, | |
SharedConfigState: session.SharedConfigEnable, | |
})) | |
// Create DynamoDB client | |
svc := dynamodb.New(sess) | |
// svc.Handlers.Build.PushFront(func(r *request.Request) { | |
// obs.Counter("NewRequest", 1) | |
// }) | |
return &Database{ | |
sess: sess, | |
svc: svc, | |
table: table, | |
} | |
} | |
func (d *Database) saveItem(item interface{}) error { | |
if d.table != "" { | |
av, err := dynamodbattribute.MarshalMap(item) | |
if err != nil { | |
log.Fatalf("Got error marshalling new item: %s", err) | |
} | |
tableName := d.table | |
input := &dynamodb.PutItemInput{ | |
Item: av, | |
TableName: aws.String(tableName), | |
} | |
// d.Lock() | |
// defer d.Unlock() | |
_, err = d.svc.PutItem(input) | |
if err != nil { | |
log.Fatalf("Got error calling PutItem: %s", err) | |
} | |
return err | |
} else { | |
return nil | |
} | |
} | |
//(map[int]*ddbEntry, error) | |
func main() { | |
RunInventory() | |
} | |
const maxCalc = 50 // TODO: Make it configurable | |
func RunInventory() { | |
var startTime = time.Now() | |
var db = NewDatabaseConnection("your_ddb_database") | |
var path = "/" | |
// cwd, err := os.Getwd() | |
dir := path | |
workerCount := 1 | |
jobs := make(chan workerJob, workerCount) | |
results := make(chan workerResult) | |
readDone := make(chan bool) | |
calcDone := make(chan bool) | |
wg := &sync.WaitGroup{} | |
// start N workers | |
for i := 0; i < workerCount; i++ { | |
go treeWalker(jobs, results, wg) | |
} | |
// One initial job | |
wg.Add(1) | |
go func() { | |
jobs <- workerJob{ | |
Root: dir, | |
} | |
}() | |
for i := 0; i < workerCount; i++ { | |
go workerCalc(results, calcDone, startTime.String(), db) | |
} | |
// When all jobs finished, shutdown the system. | |
go func() { | |
wg.Wait() | |
readDone <- true | |
}() | |
readloop: | |
for { | |
select { | |
// case res := <-results: | |
// // ctx.Obs.Warning(`result=%#v`, res.Filename) | |
// _ = res | |
// ctx.Obs.Warning("%v - result=%#v", md5Value, res.Filename) | |
case <-readDone: | |
// Finished traversing path | |
// ctx.Obs.Warning(`got stop`) | |
close(jobs) | |
close(results) | |
case <-calcDone: | |
// All existing files have been processed by calc_md5 | |
// ctx.Obs.Warning(`got stop Calc`) | |
break readloop | |
} | |
} | |
} | |
// consumer of the jobs channel | |
// producer of workerResult | |
func treeWalker(jobs chan workerJob, results chan<- workerResult, wg *sync.WaitGroup) { | |
// While there are new jobs | |
for j := range jobs { | |
dir, err := os.Open(j.Root) | |
if err != nil { | |
handleFileOpenLimits(err) | |
dir.Close() | |
wg.Done() | |
continue | |
} | |
fInfo, err := dir.Readdir(-1) // Return all files in directory | |
dir.Close() | |
if err != nil { | |
handleFileOpenLimits(err) | |
wg.Done() | |
// if os.IsPermission(err) { | |
// // Skip if there's no permission | |
// continue | |
// } | |
// For now, skip if there is an error | |
continue | |
} | |
for _, file := range fInfo { | |
fpath, _ := filepath.Abs(filepath.Join(dir.Name(), file.Name())) | |
if file.Mode().IsRegular() { | |
// is file | |
fs := uint64(file.Size()) | |
if fs == 0 { | |
// Skip zero sized | |
// wg.Done() // Not needed because if it is a file there is not a new job | |
continue | |
} | |
r := workerResult{ | |
Filename: fpath, | |
} | |
results <- r | |
} else if file.IsDir() { | |
// Send directory to be processed by the worker | |
nj := workerJob{ | |
Root: fpath, | |
} | |
// One more job, adds to wg | |
wg.Add(1) | |
// Do not block when sending jobs | |
go func() { | |
jobs <- nj | |
}() | |
} | |
} | |
// Done one job, let wg know. | |
wg.Done() | |
} | |
} | |
func openFile(filename string) string { | |
file, err := os.Open(filename) | |
if err != nil { | |
handleFileOpenLimits(err) | |
file.Close() | |
return "" | |
// panic(err) | |
} | |
// defer file.Close() | |
hash := md5.New() | |
_, err = io.Copy(hash, file) | |
if err != nil { | |
handleFileOpenLimits(err) | |
file.Close() | |
return "" | |
// panic(err) | |
} | |
file.Close() | |
return filename | |
} | |
func workerCalc(results chan workerResult, done chan bool, timestamp string, database *Database) { | |
wgCalc := &sync.WaitGroup{} | |
i := workerCounter{ | |
counter: 0, | |
} | |
wgCalc.Add(1) | |
go func() { | |
wgCalc.Wait() | |
done <- true | |
}() | |
for j := range results { | |
wgCalc.Add(1) | |
i.Lock() | |
i.counter++ | |
i.Unlock() | |
for func() int { | |
defer i.RUnlock() | |
i.RLock() | |
return i.counter | |
}() > maxCalc { | |
time.Sleep(100 * time.Millisecond) | |
} | |
go func(filename string) { | |
res := openFile(filename) | |
if res != "" { | |
database.PutContentHash(res, timestamp) | |
} | |
i.Lock() | |
i.counter-- | |
i.Unlock() | |
wgCalc.Done() | |
}(j.Filename) | |
} | |
wgCalc.Done() | |
} | |
// This will detect if the io errors are related with too many open files | |
// and suggest the user to reduce the max limit of goroutines | |
// Or to increase the corresponding ulimits | |
func handleFileOpenLimits(err error) { | |
var perr *fs.PathError | |
if errors.As(err, &perr) { | |
if perr.Unwrap() == syscall.EMFILE { | |
log.Printf("You are running %d goroutines & have opened more files than what is allowed in your ulimits, please check", runtime.NumGoroutine()) | |
} else { | |
log.Printf("%#v", perr) | |
} | |
} | |
} | |
func (d *Database) PutContentHash(filename string, timestamp string) error { | |
var x = &ddbEntry{ | |
FileName: filename, | |
Pk: filename, | |
Sk: timestamp, | |
} | |
err := d.saveItem(x) | |
if err != nil { | |
log.Fatalf("Got error calling PutContentHash: %s", err) | |
} | |
return err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment