Last active
May 13, 2025 18:28
-
-
Save dutchiechris/1d933a5fdb3b2924a1a97015301d0589 to your computer and use it in GitHub Desktop.
GCS multipart uploads using the golang AWS S3 SDK
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
# Make a HdX disk with 5GiB/s+ perf, or use local SSD, and create test file | |
sudo mkfs.ext4 /dev/disk/by-id/google-adhoc1 | |
sudo mount /dev/disk/by-id/google-adhoc1 /mnt/tmpfs | |
dd if=/dev/zero of=/mnt/tmpfs/1g.dat bs=1M count=50000 | |
# Set keys with access to your bucket | |
export AWS_ACCESS_KEY_ID="<YOUR GCS HMAC ACCESS KEY>" | |
export AWS_SECRET_ACCESS_KEY="<YOUR GCS HMAC SECRET KEY>" | |
# Install golang from the web and then build golang | |
mkdir golang-code | |
cd golang-code | |
go mod init s3-gcs-mpu | |
# Copy this code into the main.go file | |
vi main.go | |
go mod tidy | |
go build | |
./s3-gcs-mpu | |
2025/05/13 18:20:34 Go runtime GOMAXPROCS explicitly set to: 88 (was 88). Available CPUs: 88 | |
2025/05/13 18:20:34 Attempting to open file: /mnt/tmpfs/file.dat | |
2025/05/13 18:20:34 File /mnt/tmpfs/file.dat opened successfully, size: 48.83 GiB (50000.00 MiB) | |
2025/05/13 18:20:34 Calculated number of parts: 5000 for file size 50000.00 MiB and part size 10.00 MiB | |
2025/05/13 18:20:34 S3 Uploader configured with PartSize: 10485760 bytes (10.00 MiB), Concurrency: 4000 | |
2025/05/13 18:20:34 Starting multipart upload of /mnt/tmpfs/file.dat (48.83 GiB) to gs://20231221-boto/file.dat using endpoint https://storage.googleapis.com | |
2025/05/13 18:20:47 Successfully uploaded /mnt/tmpfs/file.dat to GCS S3 location: https://storage.googleapis.com/20231221-boto/file.dat | |
2025/05/13 18:20:47 ETag of uploaded object: "28c41b88641e5f31ebcfc4e0faa3bfe6-5000" | |
2025/05/13 18:20:47 Upload completed in 12.843585725s | |
2025/05/13 18:20:47 Transfer speed: 3892.99 MiB/s (31.14 Gbps) | |
*/ | |
package main | |
import ( | |
"fmt" | |
"io" | |
"log" | |
"net" | |
"net/http" | |
"net/url" | |
"os" | |
"runtime" // For GOMAXPROCS and NumCPU | |
"time" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/awserr" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/aws/signer/v4" | |
"github.com/aws/aws-sdk-go/service/s3" | |
"github.com/aws/aws-sdk-go/service/s3/s3manager" | |
) | |
// RecalculateV4Signature is a custom http.RoundTripper. | |
// It re-signs requests for GCS S3 compatibility and logs part completions. | |
type RecalculateV4Signature struct { | |
OriginalTransport http.RoundTripper | |
Signer *v4.Signer | |
Region string | |
ServiceName string | |
} | |
// RoundTrip implements the http.RoundTripper interface. | |
func (r *RecalculateV4Signature) RoundTrip(req *http.Request) (*http.Response, error) { | |
var bodyAsSeeker io.ReadSeeker | |
originalURL := req.URL.String() | |
if req.Body != nil { | |
rs, ok := req.Body.(io.ReadSeeker) | |
if !ok { | |
log.Printf("CRITICAL: Request body for URL %s is not an io.ReadSeeker. Cannot proceed with custom signing.", originalURL) | |
return nil, fmt.Errorf("custom signer: request body for %s is not an io.ReadSeeker", originalURL) | |
} | |
bodyAsSeeker = rs | |
if _, err := bodyAsSeeker.Seek(0, io.SeekStart); err != nil { | |
return nil, fmt.Errorf("custom signer: failed to seek body to start before signing for %s: %w", originalURL, err) | |
} | |
} | |
req.Header.Del("Authorization") | |
req.Header.Del("X-Amz-Security-Token") | |
_, err := r.Signer.Sign(req, bodyAsSeeker, r.ServiceName, r.Region, time.Now().UTC()) | |
if err != nil { | |
return nil, fmt.Errorf("custom signer: failed to sign request for %s: %w", originalURL, err) | |
} | |
if bodyAsSeeker != nil { | |
if _, err := bodyAsSeeker.Seek(0, io.SeekStart); err != nil { | |
return nil, fmt.Errorf("custom signer: failed to seek body to start after signing for %s: %w", originalURL, err) | |
} | |
} | |
resp, err := r.OriginalTransport.RoundTrip(req) | |
if err == nil && resp != nil && req.Method == "PUT" { | |
parsedURL, parseErr := url.Parse(originalURL) | |
if parseErr == nil { | |
queryParams := parsedURL.Query() | |
partNumber := queryParams.Get("partNumber") | |
uploadID := queryParams.Get("uploadId") | |
if partNumber != "" && uploadID != "" { | |
if resp.StatusCode == http.StatusOK { | |
// Reduce chattiness of debug log for very high concurrency | |
// log.Printf("DEBUG: Successfully uploaded PartNumber: %s for UploadID: %s (Status: %s, Size: %d bytes)", | |
// partNumber, uploadID, resp.Status, req.ContentLength) | |
} else { | |
log.Printf("DEBUG: PartNumber: %s for UploadID: %s - HTTP Status: %s (Size: %d bytes)", | |
partNumber, uploadID, resp.Status, req.ContentLength) | |
} | |
} | |
} | |
} | |
return resp, err | |
} | |
func main() { | |
// --- Explicitly set GOMAXPROCS to use all available CPUs --- | |
prevGOMAXPROCS := runtime.GOMAXPROCS(runtime.NumCPU()) | |
log.Printf("Go runtime GOMAXPROCS explicitly set to: %d (was %d). Available CPUs: %d", runtime.GOMAXPROCS(0), prevGOMAXPROCS, runtime.NumCPU()) | |
// --- Performance Tuning Parameters --- | |
// For a 50GiB file on a 100 vCPU machine, aiming for max throughput. | |
const uploaderConcurrency = 4000 // Keep concurrency very high. Adjust based on CPU and GCS limits. | |
// Reverting to a smaller part size, as 32MB caused a performance drop. | |
// 50GiB = 51200 MiB. 51200 MiB / 16 MiB/part = 3200 parts. | |
const uploaderPartSize = 10 * 1024 * 1024 // 16MB parts. | |
// --- Configuration --- | |
gcsS3Endpoint := "https://storage.googleapis.com" | |
awsRegion := "us-east1" | |
bucketName := "20231221-boto" | |
objectKey := "file.dat" // Generic name, actual file is 50GiB | |
filePath := "/mnt/tmpfs/file.dat" // Path to the 50GiB file | |
// --- AWS SDK Session and Configuration --- | |
awsCfg := aws.NewConfig(). | |
WithRegion(awsRegion). | |
WithEndpoint(gcsS3Endpoint). | |
WithS3ForcePathStyle(true) | |
customHTTPTransport := &http.Transport{ | |
Proxy: http.ProxyFromEnvironment, | |
DialContext: (&net.Dialer{ | |
Timeout: 30 * time.Second, | |
KeepAlive: 30 * time.Second, | |
}).DialContext, | |
ForceAttemptHTTP2: true, | |
MaxIdleConns: uploaderConcurrency + 100, // Generous idle connections for high concurrency | |
MaxIdleConnsPerHost: uploaderConcurrency + 50, // Generous idle connections per host | |
MaxConnsPerHost: uploaderConcurrency + 50, // Explicitly set MaxConnsPerHost | |
IdleConnTimeout: 90 * time.Second, | |
TLSHandshakeTimeout: 10 * time.Second, | |
ExpectContinueTimeout: 1 * time.Second, | |
} | |
initialSess, err := session.NewSession(awsCfg) | |
if err != nil { | |
log.Fatalf("Failed to create initial AWS session: %v", err) | |
} | |
signer := v4.NewSigner(initialSess.Config.Credentials) | |
customSigningTransport := &RecalculateV4Signature{ | |
OriginalTransport: customHTTPTransport, | |
Signer: signer, | |
Region: aws.StringValue(initialSess.Config.Region), | |
ServiceName: "s3", | |
} | |
awsCfg.HTTPClient = &http.Client{Transport: customSigningTransport} | |
finalSess, err := session.NewSession(awsCfg) | |
if err != nil { | |
log.Fatalf("Failed to create final AWS session with custom HTTP client: %v", err) | |
} | |
s3Client := s3.New(finalSess) | |
// --- File Upload Logic --- | |
log.Printf("Attempting to open file: %s", filePath) | |
file, err := os.Open(filePath) | |
if err != nil { | |
log.Fatalf("Failed to open file %q: %v", filePath, err) | |
} | |
defer file.Close() | |
fileInfo, err := file.Stat() | |
if err != nil { | |
log.Fatalf("Failed to get file info for %q: %v", filePath, err) | |
} | |
fileSize := fileInfo.Size() | |
log.Printf("File %s opened successfully, size: %.2f GiB (%.2f MiB)", | |
filePath, float64(fileSize)/(1024*1024*1024), float64(fileSize)/(1024*1024)) | |
// Calculate number of parts to ensure we don't exceed limits | |
numParts := fileSize / uploaderPartSize | |
if fileSize%uploaderPartSize != 0 { | |
numParts++ | |
} | |
log.Printf("Calculated number of parts: %d for file size %.2f MiB and part size %.2f MiB", | |
numParts, float64(fileSize)/(1024*1024), float64(uploaderPartSize)/(1024*1024)) | |
if numParts > 10000 { | |
log.Fatalf("Error: Number of parts (%d) exceeds S3 limit of 10,000. Increase part size or use a smaller file.", numParts) | |
} | |
if numParts == 0 && fileSize > 0 { // Should not happen with correct logic but good to check | |
log.Fatalf("Error: Calculated 0 parts for a non-empty file. Check part size and file size logic.") | |
} | |
uploader := s3manager.NewUploaderWithClient(s3Client, func(u *s3manager.Uploader) { | |
u.PartSize = uploaderPartSize | |
u.Concurrency = uploaderConcurrency | |
// u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(int(uploaderPartSize)) // Example of custom buffer provider, might be worth testing | |
log.Printf("S3 Uploader configured with PartSize: %d bytes (%.2f MiB), Concurrency: %d", | |
u.PartSize, float64(u.PartSize)/(1024*1024), u.Concurrency) | |
}) | |
log.Printf("Starting multipart upload of %s (%.2f GiB) to gs://%s/%s using endpoint %s", | |
filePath, float64(fileSize)/(1024*1024*1024), bucketName, objectKey, gcsS3Endpoint) | |
uploadInput := &s3manager.UploadInput{ | |
Bucket: aws.String(bucketName), | |
Key: aws.String(objectKey), | |
Body: file, | |
} | |
startTime := time.Now() | |
result, err := uploader.Upload(uploadInput) | |
endTime := time.Now() | |
if err != nil { | |
log.Printf("Failed to upload file to GCS S3. Error type: %T, Message: %v", err, err) | |
if awsErr, ok := err.(awserr.Error); ok { | |
log.Printf("AWS Error Code: %s, AWS Error Message: %s, Original Error: %v", awsErr.Code(), awsErr.Message(), awsErr.OrigErr()) | |
} else if multiErr, ok := err.(s3manager.MultiUploadFailure); ok { | |
log.Printf("S3 MultiUploadFailure: %s (UploadID: %s)", multiErr.Error(), multiErr.UploadID()) | |
} | |
log.Fatalf("Full error details for GCS S3 upload failure: %#v", err) | |
} | |
duration := endTime.Sub(startTime) | |
fileSizeMiB := float64(fileSize) / (1024 * 1024) | |
speedMiBps := 0.0 | |
if duration.Seconds() > 0 { | |
speedMiBps = fileSizeMiB / duration.Seconds() | |
} | |
log.Printf("Successfully uploaded %s to GCS S3 location: %s", filePath, result.Location) | |
if result.ETag != nil { | |
log.Printf("ETag of uploaded object: %s", aws.StringValue(result.ETag)) | |
} | |
log.Printf("Upload completed in %v", duration) | |
log.Printf("Transfer speed: %.2f MiB/s (%.2f Gbps)", speedMiBps, speedMiBps*8/1000) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment