Skip to content

Instantly share code, notes, and snippets.

@crazygit
Created September 2, 2024 02:09
Show Gist options
  • Save crazygit/1bc06b15e0fc9530c2a9e792a2d2f106 to your computer and use it in GitHub Desktop.
Save crazygit/1bc06b15e0fc9530c2a9e792a2d2f106 to your computer and use it in GitHub Desktop.
Implementing S3 File Transfer with Go: Download and Upload at Once
package main
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/dustin/go-humanize"
"io"
"log"
"sync"
"sync/atomic"
)
type OrderedWriter struct {
buffer sync.Map
writer *io.PipeWriter
written uint64
expected int64
size *uint64
wg sync.WaitGroup
mu sync.Mutex
}
func (o *OrderedWriter) WriteAt(p []byte, offset int64) (n int, err error) {
o.wg.Add(1)
writeBlockSize := len(p)
if o.size != nil {
atomic.AddUint64(&o.written, uint64(writeBlockSize))
percentageDownloaded := float32(o.written*100) / float32(*o.size)
log.Printf("File size: %s, downloaded: %s, percentage: %.2f%%\n", humanize.IBytes(*o.size), humanize.IBytes(o.written), percentageDownloaded)
}
buf := make([]byte, writeBlockSize)
copy(buf, p)
o.buffer.Store(offset, buf)
o.flush()
return writeBlockSize, nil
}
func (o *OrderedWriter) flush() {
o.mu.Lock()
defer func() {
o.wg.Done()
o.mu.Unlock()
}()
for {
if data, ok := o.buffer.Load(o.expected); ok {
buf := data.([]byte)
flushBlockSize := len(buf)
_, err := o.writer.Write(buf)
if err != nil {
panic("Error writing to buf: " + err.Error())
}
o.buffer.Delete(o.expected)
atomic.AddInt64(&o.expected, int64(flushBlockSize))
} else {
break
}
}
}
func download(s3Client *s3.Client, orderedWriter *OrderedWriter, sourceBucket string, sourceKey string) {
log.Println("Begin downloading...")
downloader := manager.NewDownloader(s3Client, func(downloader *manager.Downloader) {
// Define a strategy that will buffer 50 MiB in memory
downloader.BufferProvider = manager.NewPooledBufferedWriterReadFromProvider(50 * 1024 * 1024)
downloader.PartSize = 100 * 1024 * 1024 // 100MB
downloader.Concurrency = 5
downloader.PartBodyMaxRetries = 5
})
_, err := downloader.Download(context.TODO(), orderedWriter, &s3.GetObjectInput{
Bucket: aws.String(sourceBucket),
Key: aws.String(sourceKey),
})
if err != nil {
log.Fatalf("download failed due to: %v\n", err)
}
log.Println("Download Success!")
orderedWriter.wg.Wait()
defer func() {
if err := orderedWriter.writer.Close(); err != nil {
panic(fmt.Sprintf("close writer error: %v", err))
}
}()
}
func upload(s3Client *s3.Client, wg *sync.WaitGroup, destinationBucket string, destinationKey string, reader *io.PipeReader) {
defer wg.Done()
uploader := manager.NewUploader(s3Client, func(u *manager.Uploader) {
// Define a strategy that will buffer 50 MiB in memory
u.BufferProvider = manager.NewBufferedReadSeekerWriteToPool(50 * 1024 * 1024)
u.PartSize = 100 * 1024 * 1024 // 100 MiB
u.Concurrency = 5
})
log.Println("Begin uploading...")
_, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(destinationBucket),
Key: aws.String(destinationKey),
Body: reader,
})
if err != nil {
log.Fatalf("upload failed due to: %v\n", err)
}
log.Println("Finish uploading...")
}
func getFileSize(s3Client s3.Client, sourceBucket string, sourceKey string) *uint64 {
headObject, err := s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(sourceBucket),
Key: aws.String(sourceKey),
})
if err != nil {
log.Fatalf("failed to get source object metadata, err: %v", err)
}
if headObject.ContentLength != nil {
return aws.Uint64(uint64(*headObject.ContentLength))
}
return nil
}
func getS3Client(profile string) *s3.Client {
// Using the SDK's default configuration, More about Configuring the AWS SDK
// https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithSharedConfigProfile(profile))
if err != nil {
log.Fatalf("failed to load aws config of profile: %s, err: %v", profile, err)
}
return s3.NewFromConfig(cfg, func(options *s3.Options) {
options.RetryMaxAttempts = 5
})
}
func main() {
// REPLACE AS YOUR REQUIRED CONFIGURATION BEGIN
sourceBucket := "source-bucket"
destinationBucket := "destination-bucket"
sourceKey := "source-key"
destinationKey := "destination-key"
sourceAccountProfile := "source-profile"
destinationAccountProfile := "destination-profile"
// REPLACE AS YOUR REQUIRED CONFIGURATION END
sourceS3Client := getS3Client(sourceAccountProfile)
destinationClient := getS3Client(destinationAccountProfile)
reader, writer := io.Pipe()
orderedWriter := &OrderedWriter{
writer: writer,
expected: 0,
size: getFileSize(*sourceS3Client, sourceBucket, sourceKey),
}
var wg sync.WaitGroup
wg.Add(1)
go download(sourceS3Client, orderedWriter, sourceBucket, sourceKey)
go upload(destinationClient, &wg, destinationBucket, destinationKey, reader)
wg.Wait()
log.Println("Download && Upload Success!")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment