Skip to content

Instantly share code, notes, and snippets.

@dutchiechris
Last active May 13, 2025 18:28
Show Gist options
  • Save dutchiechris/1d933a5fdb3b2924a1a97015301d0589 to your computer and use it in GitHub Desktop.
Save dutchiechris/1d933a5fdb3b2924a1a97015301d0589 to your computer and use it in GitHub Desktop.
GCS multipart uploads using the golang AWS S3 SDK
/*
# Make a HdX disk with 5GiB/s+ perf, or use local SSD, and create test file
sudo mkfs.ext4 /dev/disk/by-id/google-adhoc1
sudo mount /dev/disk/by-id/google-adhoc1 /mnt/tmpfs
dd if=/dev/zero of=/mnt/tmpfs/1g.dat bs=1M count=50000
# Set keys with access to your bucket
export AWS_ACCESS_KEY_ID="<YOUR GCS HMAC ACCESS KEY>"
export AWS_SECRET_ACCESS_KEY="<YOUR GCS HMAC SECRET KEY>"
# Install golang from the web and then build golang
mkdir golang-code
cd golang-code
go mod init s3-gcs-mpu
# Copy this code into the main.go file
vi main.go
go mod tidy
go build
./s3-gcs-mpu
2025/05/13 18:20:34 Go runtime GOMAXPROCS explicitly set to: 88 (was 88). Available CPUs: 88
2025/05/13 18:20:34 Attempting to open file: /mnt/tmpfs/file.dat
2025/05/13 18:20:34 File /mnt/tmpfs/file.dat opened successfully, size: 48.83 GiB (50000.00 MiB)
2025/05/13 18:20:34 Calculated number of parts: 5000 for file size 50000.00 MiB and part size 10.00 MiB
2025/05/13 18:20:34 S3 Uploader configured with PartSize: 10485760 bytes (10.00 MiB), Concurrency: 4000
2025/05/13 18:20:34 Starting multipart upload of /mnt/tmpfs/file.dat (48.83 GiB) to gs://20231221-boto/file.dat using endpoint https://storage.googleapis.com
2025/05/13 18:20:47 Successfully uploaded /mnt/tmpfs/file.dat to GCS S3 location: https://storage.googleapis.com/20231221-boto/file.dat
2025/05/13 18:20:47 ETag of uploaded object: "28c41b88641e5f31ebcfc4e0faa3bfe6-5000"
2025/05/13 18:20:47 Upload completed in 12.843585725s
2025/05/13 18:20:47 Transfer speed: 3892.99 MiB/s (31.14 Gbps)
*/
package main
import (
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"os"
"runtime" // For GOMAXPROCS and NumCPU
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
// RecalculateV4Signature is a custom http.RoundTripper.
// It re-signs requests for GCS S3 compatibility and logs part completions.
type RecalculateV4Signature struct {
OriginalTransport http.RoundTripper
Signer *v4.Signer
Region string
ServiceName string
}
// RoundTrip implements the http.RoundTripper interface.
func (r *RecalculateV4Signature) RoundTrip(req *http.Request) (*http.Response, error) {
var bodyAsSeeker io.ReadSeeker
originalURL := req.URL.String()
if req.Body != nil {
rs, ok := req.Body.(io.ReadSeeker)
if !ok {
log.Printf("CRITICAL: Request body for URL %s is not an io.ReadSeeker. Cannot proceed with custom signing.", originalURL)
return nil, fmt.Errorf("custom signer: request body for %s is not an io.ReadSeeker", originalURL)
}
bodyAsSeeker = rs
if _, err := bodyAsSeeker.Seek(0, io.SeekStart); err != nil {
return nil, fmt.Errorf("custom signer: failed to seek body to start before signing for %s: %w", originalURL, err)
}
}
req.Header.Del("Authorization")
req.Header.Del("X-Amz-Security-Token")
_, err := r.Signer.Sign(req, bodyAsSeeker, r.ServiceName, r.Region, time.Now().UTC())
if err != nil {
return nil, fmt.Errorf("custom signer: failed to sign request for %s: %w", originalURL, err)
}
if bodyAsSeeker != nil {
if _, err := bodyAsSeeker.Seek(0, io.SeekStart); err != nil {
return nil, fmt.Errorf("custom signer: failed to seek body to start after signing for %s: %w", originalURL, err)
}
}
resp, err := r.OriginalTransport.RoundTrip(req)
if err == nil && resp != nil && req.Method == "PUT" {
parsedURL, parseErr := url.Parse(originalURL)
if parseErr == nil {
queryParams := parsedURL.Query()
partNumber := queryParams.Get("partNumber")
uploadID := queryParams.Get("uploadId")
if partNumber != "" && uploadID != "" {
if resp.StatusCode == http.StatusOK {
// Reduce chattiness of debug log for very high concurrency
// log.Printf("DEBUG: Successfully uploaded PartNumber: %s for UploadID: %s (Status: %s, Size: %d bytes)",
// partNumber, uploadID, resp.Status, req.ContentLength)
} else {
log.Printf("DEBUG: PartNumber: %s for UploadID: %s - HTTP Status: %s (Size: %d bytes)",
partNumber, uploadID, resp.Status, req.ContentLength)
}
}
}
}
return resp, err
}
func main() {
// --- Explicitly set GOMAXPROCS to use all available CPUs ---
prevGOMAXPROCS := runtime.GOMAXPROCS(runtime.NumCPU())
log.Printf("Go runtime GOMAXPROCS explicitly set to: %d (was %d). Available CPUs: %d", runtime.GOMAXPROCS(0), prevGOMAXPROCS, runtime.NumCPU())
// --- Performance Tuning Parameters ---
// For a 50GiB file on a 100 vCPU machine, aiming for max throughput.
const uploaderConcurrency = 4000 // Keep concurrency very high. Adjust based on CPU and GCS limits.
// Reverting to a smaller part size, as 32MB caused a performance drop.
// 50GiB = 51200 MiB. 51200 MiB / 16 MiB/part = 3200 parts.
const uploaderPartSize = 10 * 1024 * 1024 // 16MB parts.
// --- Configuration ---
gcsS3Endpoint := "https://storage.googleapis.com"
awsRegion := "us-east1"
bucketName := "20231221-boto"
objectKey := "file.dat" // Generic name, actual file is 50GiB
filePath := "/mnt/tmpfs/file.dat" // Path to the 50GiB file
// --- AWS SDK Session and Configuration ---
awsCfg := aws.NewConfig().
WithRegion(awsRegion).
WithEndpoint(gcsS3Endpoint).
WithS3ForcePathStyle(true)
customHTTPTransport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: uploaderConcurrency + 100, // Generous idle connections for high concurrency
MaxIdleConnsPerHost: uploaderConcurrency + 50, // Generous idle connections per host
MaxConnsPerHost: uploaderConcurrency + 50, // Explicitly set MaxConnsPerHost
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
initialSess, err := session.NewSession(awsCfg)
if err != nil {
log.Fatalf("Failed to create initial AWS session: %v", err)
}
signer := v4.NewSigner(initialSess.Config.Credentials)
customSigningTransport := &RecalculateV4Signature{
OriginalTransport: customHTTPTransport,
Signer: signer,
Region: aws.StringValue(initialSess.Config.Region),
ServiceName: "s3",
}
awsCfg.HTTPClient = &http.Client{Transport: customSigningTransport}
finalSess, err := session.NewSession(awsCfg)
if err != nil {
log.Fatalf("Failed to create final AWS session with custom HTTP client: %v", err)
}
s3Client := s3.New(finalSess)
// --- File Upload Logic ---
log.Printf("Attempting to open file: %s", filePath)
file, err := os.Open(filePath)
if err != nil {
log.Fatalf("Failed to open file %q: %v", filePath, err)
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
log.Fatalf("Failed to get file info for %q: %v", filePath, err)
}
fileSize := fileInfo.Size()
log.Printf("File %s opened successfully, size: %.2f GiB (%.2f MiB)",
filePath, float64(fileSize)/(1024*1024*1024), float64(fileSize)/(1024*1024))
// Calculate number of parts to ensure we don't exceed limits
numParts := fileSize / uploaderPartSize
if fileSize%uploaderPartSize != 0 {
numParts++
}
log.Printf("Calculated number of parts: %d for file size %.2f MiB and part size %.2f MiB",
numParts, float64(fileSize)/(1024*1024), float64(uploaderPartSize)/(1024*1024))
if numParts > 10000 {
log.Fatalf("Error: Number of parts (%d) exceeds S3 limit of 10,000. Increase part size or use a smaller file.", numParts)
}
if numParts == 0 && fileSize > 0 { // Should not happen with correct logic but good to check
log.Fatalf("Error: Calculated 0 parts for a non-empty file. Check part size and file size logic.")
}
uploader := s3manager.NewUploaderWithClient(s3Client, func(u *s3manager.Uploader) {
u.PartSize = uploaderPartSize
u.Concurrency = uploaderConcurrency
// u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(int(uploaderPartSize)) // Example of custom buffer provider, might be worth testing
log.Printf("S3 Uploader configured with PartSize: %d bytes (%.2f MiB), Concurrency: %d",
u.PartSize, float64(u.PartSize)/(1024*1024), u.Concurrency)
})
log.Printf("Starting multipart upload of %s (%.2f GiB) to gs://%s/%s using endpoint %s",
filePath, float64(fileSize)/(1024*1024*1024), bucketName, objectKey, gcsS3Endpoint)
uploadInput := &s3manager.UploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
Body: file,
}
startTime := time.Now()
result, err := uploader.Upload(uploadInput)
endTime := time.Now()
if err != nil {
log.Printf("Failed to upload file to GCS S3. Error type: %T, Message: %v", err, err)
if awsErr, ok := err.(awserr.Error); ok {
log.Printf("AWS Error Code: %s, AWS Error Message: %s, Original Error: %v", awsErr.Code(), awsErr.Message(), awsErr.OrigErr())
} else if multiErr, ok := err.(s3manager.MultiUploadFailure); ok {
log.Printf("S3 MultiUploadFailure: %s (UploadID: %s)", multiErr.Error(), multiErr.UploadID())
}
log.Fatalf("Full error details for GCS S3 upload failure: %#v", err)
}
duration := endTime.Sub(startTime)
fileSizeMiB := float64(fileSize) / (1024 * 1024)
speedMiBps := 0.0
if duration.Seconds() > 0 {
speedMiBps = fileSizeMiB / duration.Seconds()
}
log.Printf("Successfully uploaded %s to GCS S3 location: %s", filePath, result.Location)
if result.ETag != nil {
log.Printf("ETag of uploaded object: %s", aws.StringValue(result.ETag))
}
log.Printf("Upload completed in %v", duration)
log.Printf("Transfer speed: %.2f MiB/s (%.2f Gbps)", speedMiBps, speedMiBps*8/1000)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment