Created
February 1, 2025 19:13
-
-
Save siviae/256f9b186407ff7a9d0f5564d5415737 to your computer and use it in GitHub Desktop.
PebbleDB -> S3 continious backup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"archive/tar" | |
"context" | |
"fmt" | |
"github.com/aws/aws-sdk-go-v2/aws" | |
"github.com/aws/aws-sdk-go-v2/service/s3" | |
"github.com/cockroachdb/pebble/v2" | |
"github.com/klauspost/compress/gzip" | |
"github.com/pkg/errors" | |
"go.uber.org/zap" | |
"io" | |
"math/rand" | |
"os" | |
"path/filepath" | |
"strconv" | |
"time" | |
) | |
type impl struct { | |
s3 *s3.Client | |
bucketName string | |
db *pebble.DB | |
} | |
func (s *impl) RestoreFromBackup( | |
ctx context.Context, | |
backupFileName string, | |
localDirectory string, | |
pebbleOptions *pebble.Options, | |
) (*pebble.DB, error) { | |
start := time.Now() | |
defer func() { | |
zap.L().Info("restored from backup", | |
zap.Duration("duration", time.Since(start)), | |
) | |
}() | |
zap.L().Info("restoring from backup: " + backupFileName) | |
outputFileName, err := s.downloadFile(ctx, backupFileName) | |
if err != nil { | |
return nil, errors.Wrap(err, "unable to download file from s3") | |
} | |
defer func() { | |
_ = os.RemoveAll(outputFileName) | |
}() | |
if err := untarDirectory(outputFileName, localDirectory); err != nil { | |
return nil, errors.Wrap(err, "unable to untar backup to local directory") | |
} | |
db, err := pebble.Open(localDirectory, pebbleOptions) | |
if err != nil { | |
return nil, errors.Wrap(err, "unable to open restored backup") | |
} | |
s.db = db | |
return s.db, nil | |
} | |
func (s *impl) downloadFile(ctx context.Context, backupFileName string) (string, error) { | |
object, err := s.s3.GetObject(ctx, &s3.GetObjectInput{ | |
Bucket: aws.String(s.bucketName), | |
Key: aws.String(backupFileName), | |
}) | |
if err != nil { | |
return "", errors.Wrap(err, "unable to get object") | |
} | |
tmpDir := getTempDir() | |
resultFileName := tmpDir + "/" + backupFileName | |
file, err := os.Create(resultFileName) | |
if err != nil { | |
return "", errors.Wrap(err, "unable to create output file") | |
} | |
// Copy the object content to the file | |
if _, err := io.Copy(file, object.Body); err != nil { | |
return "", errors.Wrap(err, "unable to copy file") | |
} | |
defer file.Close() | |
return resultFileName, nil | |
} | |
func (s *impl) RestoreFromLatestBackup( | |
ctx context.Context, | |
localDirectory string, | |
pebbleOptions *pebble.Options, | |
) (*pebble.DB, error) { | |
var latestKey string | |
var continuationToken *string | |
for { | |
input := &s3.ListObjectsV2Input{ | |
Bucket: aws.String(s.bucketName), | |
MaxKeys: aws.Int32(100), | |
ContinuationToken: continuationToken, | |
} | |
result, err := s.s3.ListObjectsV2(ctx, input) | |
if err != nil { | |
return nil, errors.Wrap(err, "unable to list objects") | |
} | |
if len(result.Contents) == 0 { | |
break | |
} | |
for _, obj := range result.Contents { | |
if obj.Key != nil && (*obj.Key > latestKey || latestKey == "") { | |
latestKey = *obj.Key | |
} | |
} | |
if result.IsTruncated == nil || !*result.IsTruncated { | |
break | |
} | |
continuationToken = result.NextContinuationToken | |
} | |
if latestKey == "" { | |
return nil, fmt.Errorf("no objects found in the bucket " + s.bucketName) | |
} | |
return s.RestoreFromBackup(ctx, latestKey, localDirectory, pebbleOptions) | |
} | |
func (s *impl) Backup(ctx context.Context) error { | |
return s.doBackup(ctx) | |
} | |
func (s *impl) doBackup(ctx context.Context) error { | |
start := time.Now() | |
backupFileName := fmt.Sprintf( | |
"%04d-%02d-%02d-%02d-%02d-%02d.tar.gz", | |
start.Year(), | |
start.Month(), | |
start.Day(), | |
start.Hour(), | |
start.Minute(), | |
start.Second(), | |
) | |
zap.L().Info("performing backup: " + backupFileName) | |
tmpDir := getTempDir() | |
defer func() { | |
_ = os.RemoveAll(tmpDir) | |
}() | |
checkpointDir := tmpDir + "/checkpoint" | |
if err := s.db.Checkpoint(checkpointDir, pebble.WithFlushedWAL()); err != nil { | |
return errors.Wrap(err, "unable to checkpoint") | |
} | |
destFileName := tmpDir + "/" + backupFileName | |
if err := tarDirectory(checkpointDir, destFileName); err != nil { | |
return errors.Wrap(err, "unable to tar gz dir") | |
} | |
destFile, err := os.Open(destFileName) | |
if err := tarDirectory(checkpointDir, destFileName); err != nil { | |
return errors.Wrap(err, "unable to open resulting tar archive") | |
} | |
_, err = s.s3.PutObject(ctx, &s3.PutObjectInput{ | |
Bucket: aws.String(s.bucketName), | |
Key: aws.String(backupFileName), | |
Body: destFile, | |
}) | |
if err != nil { | |
return errors.Wrap(err, "unable to save backup file") | |
} | |
zap.L().Info("backup finished", | |
zap.Duration("duration", time.Since(start)), | |
) | |
return nil | |
} | |
func getTempDir() string { | |
tmpDir := os.TempDir() + strconv.Itoa(rand.Int()) | |
if err := os.MkdirAll(tmpDir, 0777); err != nil { | |
panic(fmt.Sprintf("unable to create temp dir: %v", err)) | |
} | |
return tmpDir | |
} | |
type PebbleBackup interface { | |
Backup(ctx context.Context) error | |
RestoreFromLatestBackup( | |
ctx context.Context, | |
localDirectory string, | |
pebbleOptions *pebble.Options, | |
) (*pebble.DB, error) | |
RestoreFromBackup( | |
ctx context.Context, | |
backupFileName string, | |
localDirectory string, | |
pebbleOptions *pebble.Options, | |
) (*pebble.DB, error) | |
} | |
func NewPebbleS3Backup( | |
db *pebble.DB, | |
accessKeyId string, | |
secretAccessKey string, | |
endpoint string, | |
region string, | |
bucket string, | |
) PebbleBackup { | |
return &impl{ | |
s3: s3.New(s3.Options{}, func(o *s3.Options) { | |
o.BaseEndpoint = aws.String(endpoint) | |
o.Region = region | |
o.UsePathStyle = true | |
o.Credentials = aws.CredentialsProviderFunc(func(context.Context) (aws.Credentials, error) { | |
return aws.Credentials{ | |
AccessKeyID: accessKeyId, | |
SecretAccessKey: secretAccessKey, | |
}, nil | |
}) | |
}), | |
bucketName: bucket, | |
db: db, | |
} | |
} | |
func tarDirectory(srcDir string, destFile string) error { | |
// Create the destination file | |
out, err := os.Create(destFile) | |
if err != nil { | |
return err | |
} | |
defer out.Close() | |
// Create a gzip writer | |
gzipWriter := gzip.NewWriter(out) | |
defer gzipWriter.Close() | |
// Create a tar writer | |
tarWriter := tar.NewWriter(gzipWriter) | |
defer tarWriter.Close() | |
// Walk the directory | |
return filepath.Walk(srcDir, func(file string, fi os.FileInfo, err error) error { | |
if err != nil { | |
return err | |
} | |
// Create a header for the file | |
header, err := tar.FileInfoHeader(fi, file) | |
if err != nil { | |
return err | |
} | |
// Write the header | |
if err := tarWriter.WriteHeader(header); err != nil { | |
return err | |
} | |
// If it's not a directory, write the file content | |
if !fi.IsDir() { | |
f, err := os.Open(file) | |
if err != nil { | |
return err | |
} | |
defer f.Close() | |
if _, err := io.Copy(tarWriter, f); err != nil { | |
return err | |
} | |
} | |
return nil | |
}) | |
} | |
func untarDirectory(srcFile, destDir string) error { | |
// Open the tar.gz file | |
file, err := os.Open(srcFile) | |
if err != nil { | |
return err | |
} | |
defer file.Close() | |
// Create a gzip reader | |
gzipReader, err := gzip.NewReader(file) | |
if err != nil { | |
return err | |
} | |
defer gzipReader.Close() | |
// Create a tar reader | |
tarReader := tar.NewReader(gzipReader) | |
// Iterate through the tar archive | |
for { | |
header, err := tarReader.Next() | |
if err == io.EOF { | |
break // End of archive | |
} | |
if err != nil { | |
return err | |
} | |
// The target path is the destination directory plus the file path in the archive | |
target := filepath.Join(destDir, header.Name) | |
// Check the file type | |
switch header.Typeflag { | |
// If its a dir, create it | |
case tar.TypeDir: | |
if _, err := os.Stat(target); err != nil { | |
if err := os.MkdirAll(target, os.FileMode(header.Mode)); err != nil { | |
return err | |
} | |
} | |
// If it's a file, create it | |
case tar.TypeReg: | |
// Create the file | |
file, err := os.Create(target) | |
if err != nil { | |
return err | |
} | |
defer file.Close() | |
// Copy over the file contents | |
if _, err := io.Copy(file, tarReader); err != nil { | |
return err | |
} | |
} | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment