Skip to content

Instantly share code, notes, and snippets.

@NitriKx
Created May 16, 2025 11:59
Show Gist options
  • Save NitriKx/767c2d0d206551d56501a1d7657fb43e to your computer and use it in GitHub Desktop.
Save NitriKx/767c2d0d206551d56501a1d7657fb43e to your computer and use it in GitHub Desktop.
Backup multiple S3 buckets into zip files for archiving (mostly AI Generated)
package main
import (
"archive/zip"
"context"
"flag"
"fmt"
"io"
"log"
"os"
"strings"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// processSourceBucket handles zipping and uploading for a single source bucket.
func processSourceBucket(
ctx context.Context,
sourceBucketName string,
sourceRegion string,
destBucketName string,
destS3Uploader *manager.Uploader, // Shared uploader for the destination
wg *sync.WaitGroup,
) {
defer wg.Done()
log.Printf("INFO [%s]: Starting processing for source bucket in region %s", sourceBucketName, sourceRegion)
// --- Initialize S3 Client for this specific source bucket's region ---
sourceCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(sourceRegion))
if err != nil {
log.Printf("ERROR [%s]: Failed to load AWS config for source region %s: %v", sourceBucketName, sourceRegion, err)
return
}
sourceS3Client := s3.NewFromConfig(sourceCfg)
// --- Determine the destination object key for this bucket ---
destinationObjectKey := "s3/" + sourceBucketName + ".zip"
log.Printf("INFO [%s]: Destination object key will be: %s in bucket %s", sourceBucketName, destinationObjectKey, destBucketName)
// --- Create a pipe for streaming data for this bucket's zip ---
pipeReader, pipeWriter := io.Pipe()
zipArchive := zip.NewWriter(pipeWriter)
var zipProcessingWg sync.WaitGroup
zipProcessingWg.Add(1)
// Goroutine for zipping this specific source bucket's content
go func() {
defer zipProcessingWg.Done()
// Critical: Close zipArchive first, then pipeWriter.
// pipeWriter.Close() signals EOF to the uploader.
// pipeWriter.CloseWithError() signals an error.
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC [%s Zipping]: Recovered from panic: %v", sourceBucketName, r)
_ = pipeWriter.CloseWithError(fmt.Errorf("panic during zipping: %v", r))
return
}
if err := zipArchive.Close(); err != nil {
log.Printf("ERROR [%s Zipping]: Failed to close zip archive writer: %v", sourceBucketName, err)
_ = pipeWriter.CloseWithError(fmt.Errorf("zip archive close error: %w", err))
} else {
// If zipArchive.Close() is successful, then close pipeWriter to signal EOF.
if err := pipeWriter.Close(); err != nil {
log.Printf("ERROR [%s Zipping]: Failed to close pipe writer after zip close: %v", sourceBucketName, err)
// If this fails, the uploader might hang or error out; uploader's error will be caught.
}
}
}()
paginator := s3.NewListObjectsV2Paginator(sourceS3Client, &s3.ListObjectsV2Input{
Bucket: aws.String(sourceBucketName),
})
log.Printf("INFO [%s Zipping]: Starting to list and process objects", sourceBucketName)
objectCount := 0
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
log.Printf("ERROR [%s Zipping]: Failed to get page from source bucket: %v", sourceBucketName, err)
_ = pipeWriter.CloseWithError(fmt.Errorf("failed to list objects: %w", err))
return
}
for _, s3Object := range page.Contents {
objectKey := aws.ToString(s3Object.Key)
if strings.HasSuffix(objectKey, "/") && s3Object.Size != nil && *s3Object.Size == 0 {
log.Printf("INFO [%s Zipping]: Skipping directory-like object: %s", sourceBucketName, objectKey)
continue
}
log.Printf("INFO [%s Zipping]: Processing object: %s (Size: %d bytes)", sourceBucketName, objectKey, s3Object.Size)
zipEntryWriter, err := zipArchive.Create(objectKey)
if err != nil {
log.Printf("ERROR [%s Zipping]: Failed to create zip entry for '%s': %v", sourceBucketName, objectKey, err)
_ = pipeWriter.CloseWithError(fmt.Errorf("failed to create zip entry for '%s': %w", objectKey, err))
return
}
getObjectInput := &s3.GetObjectInput{
Bucket: aws.String(sourceBucketName),
Key: s3Object.Key,
}
getObjectOutput, err := sourceS3Client.GetObject(ctx, getObjectInput)
if err != nil {
log.Printf("ERROR [%s Zipping]: Failed to download object '%s': %v", sourceBucketName, objectKey, err)
_ = pipeWriter.CloseWithError(fmt.Errorf("failed to download '%s': %w", objectKey, err))
if getObjectOutput != nil && getObjectOutput.Body != nil {
getObjectOutput.Body.Close()
}
return
}
bytesCopied, err := io.Copy(zipEntryWriter, getObjectOutput.Body)
getObjectOutput.Body.Close()
if err != nil {
log.Printf("ERROR [%s Zipping]: Failed to copy content of object '%s' to zip: %v", sourceBucketName, objectKey, err)
_ = pipeWriter.CloseWithError(fmt.Errorf("failed to copy '%s' to zip: %w", objectKey, err))
return
}
log.Printf("INFO [%s Zipping]: Successfully added '%s' to zip stream (%d bytes written).", sourceBucketName, objectKey, bytesCopied)
objectCount++
}
}
if objectCount == 0 {
log.Printf("WARN [%s Zipping]: No objects found or processed. The resulting zip file might be empty.", sourceBucketName)
} else {
log.Printf("INFO [%s Zipping]: Finished processing %d objects. Finalizing zip stream.", sourceBucketName, objectCount)
}
}()
// --- Upload Zip to Destination Bucket (Streaming) ---
log.Printf("INFO [%s]: Starting upload of '%s' to bucket '%s'", sourceBucketName, destinationObjectKey, destBucketName)
uploadInput := &s3.PutObjectInput{
Bucket: aws.String(destBucketName),
Key: aws.String(destinationObjectKey),
Body: pipeReader, // Read from the pipe that the zipping goroutine writes to
StorageClass: "GLACIER",
ACL: "bucket-owner-full-control",
}
_, uploadErr := destS3Uploader.Upload(ctx, uploadInput)
// Wait for the zipping goroutine to complete its deferred cleanup,
// especially closing pipeWriter, which might be the source of uploadErr.
zipProcessingWg.Wait()
if uploadErr != nil {
log.Printf("ERROR [%s]: Failed to upload zip archive '%s': %v", sourceBucketName, destinationObjectKey, uploadErr)
return
}
log.Printf("SUCCESS [%s]: Successfully uploaded '%s' to bucket '%s'", sourceBucketName, destinationObjectKey, destBucketName)
}
func main() {
// --- CLI Parameters ---
sourceBucketsStr := flag.String("source-buckets", "", "Comma-separated list of S3 bucket names to read files from (required)")
sourceRegionsStr := flag.String("source-regions", "", "Comma-separated list of AWS regions for source S3 buckets, in corresponding order (required)")
destBucketNameFlag := flag.String("dest-bucket", "", "S3 bucket name to write the zip files to (required)")
destRegionFlag := flag.String("dest-region", "", "AWS region for the destination S3 bucket (required, e.g., us-west-2)")
flag.Parse()
if *sourceBucketsStr == "" || *sourceRegionsStr == "" || *destBucketNameFlag == "" || *destRegionFlag == "" {
fmt.Println("Error: Missing required flags.")
flag.PrintDefaults()
os.Exit(1)
}
sourceBucketNames := strings.Split(*sourceBucketsStr, ",")
sourceRegionNames := strings.Split(*sourceRegionsStr, ",")
if len(sourceBucketNames) == 0 || len(sourceBucketNames) != len(sourceRegionNames) {
fmt.Println("Error: The number of source buckets must match the number of source regions, and must not be empty.")
flag.PrintDefaults()
os.Exit(1)
}
log.Printf("INFO: Destination Bucket: %s (Region: %s)", *destBucketNameFlag, *destRegionFlag)
log.Printf("INFO: AWS SDK will use default credential provider chain.")
ctx := context.Background()
// --- Initialize S3 Uploader for the Destination Bucket (once) ---
destCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(*destRegionFlag))
if err != nil {
log.Fatalf("FATAL: Failed to load AWS config for destination (region: %s): %v", *destRegionFlag, err)
}
destS3Uploader := manager.NewUploader(s3.NewFromConfig(destCfg, func(o *s3.Options) {
// Disable the annoying "WARN Response has no supported checksum. Not validating response payload"
o.DisableLogOutputChecksumValidationSkipped = true
}))
// --- Process each source bucket in parallel ---
var wg sync.WaitGroup
for i, sourceBucketName := range sourceBucketNames {
sourceBucketName = strings.TrimSpace(sourceBucketName)
sourceRegionName := strings.TrimSpace(sourceRegionNames[i])
if sourceBucketName == "" || sourceRegionName == "" {
log.Printf("WARN: Empty bucket name or region found at index %d, skipping.", i)
continue
}
wg.Add(1)
go processSourceBucket(
ctx,
sourceBucketName,
sourceRegionName,
*destBucketNameFlag,
destS3Uploader,
&wg,
)
}
log.Printf("INFO: Waiting for all %d source bucket processing routines to complete...", len(sourceBucketNames))
wg.Wait()
log.Println("INFO: All source bucket processing finished.")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment