Created
September 2, 2024 02:09
-
-
Save crazygit/1bc06b15e0fc9530c2a9e792a2d2f106 to your computer and use it in GitHub Desktop.
Implementing S3 File Transfer with Go: Download and Upload at Once
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"github.com/aws/aws-sdk-go-v2/aws" | |
"github.com/aws/aws-sdk-go-v2/config" | |
"github.com/aws/aws-sdk-go-v2/feature/s3/manager" | |
"github.com/aws/aws-sdk-go-v2/service/s3" | |
"github.com/dustin/go-humanize" | |
"io" | |
"log" | |
"sync" | |
"sync/atomic" | |
) | |
type OrderedWriter struct { | |
buffer sync.Map | |
writer *io.PipeWriter | |
written uint64 | |
expected int64 | |
size *uint64 | |
wg sync.WaitGroup | |
mu sync.Mutex | |
} | |
func (o *OrderedWriter) WriteAt(p []byte, offset int64) (n int, err error) { | |
o.wg.Add(1) | |
writeBlockSize := len(p) | |
if o.size != nil { | |
atomic.AddUint64(&o.written, uint64(writeBlockSize)) | |
percentageDownloaded := float32(o.written*100) / float32(*o.size) | |
log.Printf("File size: %s, downloaded: %s, percentage: %.2f%%\n", humanize.IBytes(*o.size), humanize.IBytes(o.written), percentageDownloaded) | |
} | |
buf := make([]byte, writeBlockSize) | |
copy(buf, p) | |
o.buffer.Store(offset, buf) | |
o.flush() | |
return writeBlockSize, nil | |
} | |
func (o *OrderedWriter) flush() { | |
o.mu.Lock() | |
defer func() { | |
o.wg.Done() | |
o.mu.Unlock() | |
}() | |
for { | |
if data, ok := o.buffer.Load(o.expected); ok { | |
buf := data.([]byte) | |
flushBlockSize := len(buf) | |
_, err := o.writer.Write(buf) | |
if err != nil { | |
panic("Error writing to buf: " + err.Error()) | |
} | |
o.buffer.Delete(o.expected) | |
atomic.AddInt64(&o.expected, int64(flushBlockSize)) | |
} else { | |
break | |
} | |
} | |
} | |
func download(s3Client *s3.Client, orderedWriter *OrderedWriter, sourceBucket string, sourceKey string) { | |
log.Println("Begin downloading...") | |
downloader := manager.NewDownloader(s3Client, func(downloader *manager.Downloader) { | |
// Define a strategy that will buffer 50 MiB in memory | |
downloader.BufferProvider = manager.NewPooledBufferedWriterReadFromProvider(50 * 1024 * 1024) | |
downloader.PartSize = 100 * 1024 * 1024 // 100MB | |
downloader.Concurrency = 5 | |
downloader.PartBodyMaxRetries = 5 | |
}) | |
_, err := downloader.Download(context.TODO(), orderedWriter, &s3.GetObjectInput{ | |
Bucket: aws.String(sourceBucket), | |
Key: aws.String(sourceKey), | |
}) | |
if err != nil { | |
log.Fatalf("download failed due to: %v\n", err) | |
} | |
log.Println("Download Success!") | |
orderedWriter.wg.Wait() | |
defer func() { | |
if err := orderedWriter.writer.Close(); err != nil { | |
panic(fmt.Sprintf("close writer error: %v", err)) | |
} | |
}() | |
} | |
func upload(s3Client *s3.Client, wg *sync.WaitGroup, destinationBucket string, destinationKey string, reader *io.PipeReader) { | |
defer wg.Done() | |
uploader := manager.NewUploader(s3Client, func(u *manager.Uploader) { | |
// Define a strategy that will buffer 50 MiB in memory | |
u.BufferProvider = manager.NewBufferedReadSeekerWriteToPool(50 * 1024 * 1024) | |
u.PartSize = 100 * 1024 * 1024 // 100 MiB | |
u.Concurrency = 5 | |
}) | |
log.Println("Begin uploading...") | |
_, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ | |
Bucket: aws.String(destinationBucket), | |
Key: aws.String(destinationKey), | |
Body: reader, | |
}) | |
if err != nil { | |
log.Fatalf("upload failed due to: %v\n", err) | |
} | |
log.Println("Finish uploading...") | |
} | |
func getFileSize(s3Client s3.Client, sourceBucket string, sourceKey string) *uint64 { | |
headObject, err := s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{ | |
Bucket: aws.String(sourceBucket), | |
Key: aws.String(sourceKey), | |
}) | |
if err != nil { | |
log.Fatalf("failed to get source object metadata, err: %v", err) | |
} | |
if headObject.ContentLength != nil { | |
return aws.Uint64(uint64(*headObject.ContentLength)) | |
} | |
return nil | |
} | |
func getS3Client(profile string) *s3.Client { | |
// Using the SDK's default configuration, More about Configuring the AWS SDK | |
// https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/ | |
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithSharedConfigProfile(profile)) | |
if err != nil { | |
log.Fatalf("failed to load aws config of profile: %s, err: %v", profile, err) | |
} | |
return s3.NewFromConfig(cfg, func(options *s3.Options) { | |
options.RetryMaxAttempts = 5 | |
}) | |
} | |
func main() { | |
// REPLACE AS YOUR REQUIRED CONFIGURATION BEGIN | |
sourceBucket := "source-bucket" | |
destinationBucket := "destination-bucket" | |
sourceKey := "source-key" | |
destinationKey := "destination-key" | |
sourceAccountProfile := "source-profile" | |
destinationAccountProfile := "destination-profile" | |
// REPLACE AS YOUR REQUIRED CONFIGURATION END | |
sourceS3Client := getS3Client(sourceAccountProfile) | |
destinationClient := getS3Client(destinationAccountProfile) | |
reader, writer := io.Pipe() | |
orderedWriter := &OrderedWriter{ | |
writer: writer, | |
expected: 0, | |
size: getFileSize(*sourceS3Client, sourceBucket, sourceKey), | |
} | |
var wg sync.WaitGroup | |
wg.Add(1) | |
go download(sourceS3Client, orderedWriter, sourceBucket, sourceKey) | |
go upload(destinationClient, &wg, destinationBucket, destinationKey, reader) | |
wg.Wait() | |
log.Println("Download && Upload Success!") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment