Created
May 16, 2025 11:59
-
-
Save NitriKx/767c2d0d206551d56501a1d7657fb43e to your computer and use it in GitHub Desktop.
Backup multiple S3 buckets into zip files for archiving (mostly AI Generated)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"archive/zip" | |
"context" | |
"flag" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"strings" | |
"sync" | |
"github.com/aws/aws-sdk-go-v2/aws" | |
"github.com/aws/aws-sdk-go-v2/config" | |
"github.com/aws/aws-sdk-go-v2/feature/s3/manager" | |
"github.com/aws/aws-sdk-go-v2/service/s3" | |
) | |
// processSourceBucket handles zipping and uploading for a single source bucket. | |
func processSourceBucket( | |
ctx context.Context, | |
sourceBucketName string, | |
sourceRegion string, | |
destBucketName string, | |
destS3Uploader *manager.Uploader, // Shared uploader for the destination | |
wg *sync.WaitGroup, | |
) { | |
defer wg.Done() | |
log.Printf("INFO [%s]: Starting processing for source bucket in region %s", sourceBucketName, sourceRegion) | |
// --- Initialize S3 Client for this specific source bucket's region --- | |
sourceCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(sourceRegion)) | |
if err != nil { | |
log.Printf("ERROR [%s]: Failed to load AWS config for source region %s: %v", sourceBucketName, sourceRegion, err) | |
return | |
} | |
sourceS3Client := s3.NewFromConfig(sourceCfg) | |
// --- Determine the destination object key for this bucket --- | |
destinationObjectKey := "s3/" + sourceBucketName + ".zip" | |
log.Printf("INFO [%s]: Destination object key will be: %s in bucket %s", sourceBucketName, destinationObjectKey, destBucketName) | |
// --- Create a pipe for streaming data for this bucket's zip --- | |
pipeReader, pipeWriter := io.Pipe() | |
zipArchive := zip.NewWriter(pipeWriter) | |
var zipProcessingWg sync.WaitGroup | |
zipProcessingWg.Add(1) | |
// Goroutine for zipping this specific source bucket's content | |
go func() { | |
defer zipProcessingWg.Done() | |
// Critical: Close zipArchive first, then pipeWriter. | |
// pipeWriter.Close() signals EOF to the uploader. | |
// pipeWriter.CloseWithError() signals an error. | |
defer func() { | |
if r := recover(); r != nil { | |
log.Printf("PANIC [%s Zipping]: Recovered from panic: %v", sourceBucketName, r) | |
_ = pipeWriter.CloseWithError(fmt.Errorf("panic during zipping: %v", r)) | |
return | |
} | |
if err := zipArchive.Close(); err != nil { | |
log.Printf("ERROR [%s Zipping]: Failed to close zip archive writer: %v", sourceBucketName, err) | |
_ = pipeWriter.CloseWithError(fmt.Errorf("zip archive close error: %w", err)) | |
} else { | |
// If zipArchive.Close() is successful, then close pipeWriter to signal EOF. | |
if err := pipeWriter.Close(); err != nil { | |
log.Printf("ERROR [%s Zipping]: Failed to close pipe writer after zip close: %v", sourceBucketName, err) | |
// If this fails, the uploader might hang or error out; uploader's error will be caught. | |
} | |
} | |
}() | |
paginator := s3.NewListObjectsV2Paginator(sourceS3Client, &s3.ListObjectsV2Input{ | |
Bucket: aws.String(sourceBucketName), | |
}) | |
log.Printf("INFO [%s Zipping]: Starting to list and process objects", sourceBucketName) | |
objectCount := 0 | |
for paginator.HasMorePages() { | |
page, err := paginator.NextPage(ctx) | |
if err != nil { | |
log.Printf("ERROR [%s Zipping]: Failed to get page from source bucket: %v", sourceBucketName, err) | |
_ = pipeWriter.CloseWithError(fmt.Errorf("failed to list objects: %w", err)) | |
return | |
} | |
for _, s3Object := range page.Contents { | |
objectKey := aws.ToString(s3Object.Key) | |
if strings.HasSuffix(objectKey, "/") && s3Object.Size != nil && *s3Object.Size == 0 { | |
log.Printf("INFO [%s Zipping]: Skipping directory-like object: %s", sourceBucketName, objectKey) | |
continue | |
} | |
log.Printf("INFO [%s Zipping]: Processing object: %s (Size: %d bytes)", sourceBucketName, objectKey, s3Object.Size) | |
zipEntryWriter, err := zipArchive.Create(objectKey) | |
if err != nil { | |
log.Printf("ERROR [%s Zipping]: Failed to create zip entry for '%s': %v", sourceBucketName, objectKey, err) | |
_ = pipeWriter.CloseWithError(fmt.Errorf("failed to create zip entry for '%s': %w", objectKey, err)) | |
return | |
} | |
getObjectInput := &s3.GetObjectInput{ | |
Bucket: aws.String(sourceBucketName), | |
Key: s3Object.Key, | |
} | |
getObjectOutput, err := sourceS3Client.GetObject(ctx, getObjectInput) | |
if err != nil { | |
log.Printf("ERROR [%s Zipping]: Failed to download object '%s': %v", sourceBucketName, objectKey, err) | |
_ = pipeWriter.CloseWithError(fmt.Errorf("failed to download '%s': %w", objectKey, err)) | |
if getObjectOutput != nil && getObjectOutput.Body != nil { | |
getObjectOutput.Body.Close() | |
} | |
return | |
} | |
bytesCopied, err := io.Copy(zipEntryWriter, getObjectOutput.Body) | |
getObjectOutput.Body.Close() | |
if err != nil { | |
log.Printf("ERROR [%s Zipping]: Failed to copy content of object '%s' to zip: %v", sourceBucketName, objectKey, err) | |
_ = pipeWriter.CloseWithError(fmt.Errorf("failed to copy '%s' to zip: %w", objectKey, err)) | |
return | |
} | |
log.Printf("INFO [%s Zipping]: Successfully added '%s' to zip stream (%d bytes written).", sourceBucketName, objectKey, bytesCopied) | |
objectCount++ | |
} | |
} | |
if objectCount == 0 { | |
log.Printf("WARN [%s Zipping]: No objects found or processed. The resulting zip file might be empty.", sourceBucketName) | |
} else { | |
log.Printf("INFO [%s Zipping]: Finished processing %d objects. Finalizing zip stream.", sourceBucketName, objectCount) | |
} | |
}() | |
// --- Upload Zip to Destination Bucket (Streaming) --- | |
log.Printf("INFO [%s]: Starting upload of '%s' to bucket '%s'", sourceBucketName, destinationObjectKey, destBucketName) | |
uploadInput := &s3.PutObjectInput{ | |
Bucket: aws.String(destBucketName), | |
Key: aws.String(destinationObjectKey), | |
Body: pipeReader, // Read from the pipe that the zipping goroutine writes to | |
StorageClass: "GLACIER", | |
ACL: "bucket-owner-full-control", | |
} | |
_, uploadErr := destS3Uploader.Upload(ctx, uploadInput) | |
// Wait for the zipping goroutine to complete its deferred cleanup, | |
// especially closing pipeWriter, which might be the source of uploadErr. | |
zipProcessingWg.Wait() | |
if uploadErr != nil { | |
log.Printf("ERROR [%s]: Failed to upload zip archive '%s': %v", sourceBucketName, destinationObjectKey, uploadErr) | |
return | |
} | |
log.Printf("SUCCESS [%s]: Successfully uploaded '%s' to bucket '%s'", sourceBucketName, destinationObjectKey, destBucketName) | |
} | |
func main() { | |
// --- CLI Parameters --- | |
sourceBucketsStr := flag.String("source-buckets", "", "Comma-separated list of S3 bucket names to read files from (required)") | |
sourceRegionsStr := flag.String("source-regions", "", "Comma-separated list of AWS regions for source S3 buckets, in corresponding order (required)") | |
destBucketNameFlag := flag.String("dest-bucket", "", "S3 bucket name to write the zip files to (required)") | |
destRegionFlag := flag.String("dest-region", "", "AWS region for the destination S3 bucket (required, e.g., us-west-2)") | |
flag.Parse() | |
if *sourceBucketsStr == "" || *sourceRegionsStr == "" || *destBucketNameFlag == "" || *destRegionFlag == "" { | |
fmt.Println("Error: Missing required flags.") | |
flag.PrintDefaults() | |
os.Exit(1) | |
} | |
sourceBucketNames := strings.Split(*sourceBucketsStr, ",") | |
sourceRegionNames := strings.Split(*sourceRegionsStr, ",") | |
if len(sourceBucketNames) == 0 || len(sourceBucketNames) != len(sourceRegionNames) { | |
fmt.Println("Error: The number of source buckets must match the number of source regions, and must not be empty.") | |
flag.PrintDefaults() | |
os.Exit(1) | |
} | |
log.Printf("INFO: Destination Bucket: %s (Region: %s)", *destBucketNameFlag, *destRegionFlag) | |
log.Printf("INFO: AWS SDK will use default credential provider chain.") | |
ctx := context.Background() | |
// --- Initialize S3 Uploader for the Destination Bucket (once) --- | |
destCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(*destRegionFlag)) | |
if err != nil { | |
log.Fatalf("FATAL: Failed to load AWS config for destination (region: %s): %v", *destRegionFlag, err) | |
} | |
destS3Uploader := manager.NewUploader(s3.NewFromConfig(destCfg, func(o *s3.Options) { | |
// Disable the annoying "WARN Response has no supported checksum. Not validating response payload" | |
o.DisableLogOutputChecksumValidationSkipped = true | |
})) | |
// --- Process each source bucket in parallel --- | |
var wg sync.WaitGroup | |
for i, sourceBucketName := range sourceBucketNames { | |
sourceBucketName = strings.TrimSpace(sourceBucketName) | |
sourceRegionName := strings.TrimSpace(sourceRegionNames[i]) | |
if sourceBucketName == "" || sourceRegionName == "" { | |
log.Printf("WARN: Empty bucket name or region found at index %d, skipping.", i) | |
continue | |
} | |
wg.Add(1) | |
go processSourceBucket( | |
ctx, | |
sourceBucketName, | |
sourceRegionName, | |
*destBucketNameFlag, | |
destS3Uploader, | |
&wg, | |
) | |
} | |
log.Printf("INFO: Waiting for all %d source bucket processing routines to complete...", len(sourceBucketNames)) | |
wg.Wait() | |
log.Println("INFO: All source bucket processing finished.") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment