Skip to content

Instantly share code, notes, and snippets.

@siviae
Created February 1, 2025 19:13
Show Gist options
  • Save siviae/256f9b186407ff7a9d0f5564d5415737 to your computer and use it in GitHub Desktop.
Save siviae/256f9b186407ff7a9d0f5564d5415737 to your computer and use it in GitHub Desktop.
PebbleDB -> S3 continious backup
package main
import (
"archive/tar"
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/cockroachdb/pebble/v2"
"github.com/klauspost/compress/gzip"
"github.com/pkg/errors"
"go.uber.org/zap"
"io"
"math/rand"
"os"
"path/filepath"
"strconv"
"time"
)
type impl struct {
s3 *s3.Client
bucketName string
db *pebble.DB
}
func (s *impl) RestoreFromBackup(
ctx context.Context,
backupFileName string,
localDirectory string,
pebbleOptions *pebble.Options,
) (*pebble.DB, error) {
start := time.Now()
defer func() {
zap.L().Info("restored from backup",
zap.Duration("duration", time.Since(start)),
)
}()
zap.L().Info("restoring from backup: " + backupFileName)
outputFileName, err := s.downloadFile(ctx, backupFileName)
if err != nil {
return nil, errors.Wrap(err, "unable to download file from s3")
}
defer func() {
_ = os.RemoveAll(outputFileName)
}()
if err := untarDirectory(outputFileName, localDirectory); err != nil {
return nil, errors.Wrap(err, "unable to untar backup to local directory")
}
db, err := pebble.Open(localDirectory, pebbleOptions)
if err != nil {
return nil, errors.Wrap(err, "unable to open restored backup")
}
s.db = db
return s.db, nil
}
func (s *impl) downloadFile(ctx context.Context, backupFileName string) (string, error) {
object, err := s.s3.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(backupFileName),
})
if err != nil {
return "", errors.Wrap(err, "unable to get object")
}
tmpDir := getTempDir()
resultFileName := tmpDir + "/" + backupFileName
file, err := os.Create(resultFileName)
if err != nil {
return "", errors.Wrap(err, "unable to create output file")
}
// Copy the object content to the file
if _, err := io.Copy(file, object.Body); err != nil {
return "", errors.Wrap(err, "unable to copy file")
}
defer file.Close()
return resultFileName, nil
}
func (s *impl) RestoreFromLatestBackup(
ctx context.Context,
localDirectory string,
pebbleOptions *pebble.Options,
) (*pebble.DB, error) {
var latestKey string
var continuationToken *string
for {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(s.bucketName),
MaxKeys: aws.Int32(100),
ContinuationToken: continuationToken,
}
result, err := s.s3.ListObjectsV2(ctx, input)
if err != nil {
return nil, errors.Wrap(err, "unable to list objects")
}
if len(result.Contents) == 0 {
break
}
for _, obj := range result.Contents {
if obj.Key != nil && (*obj.Key > latestKey || latestKey == "") {
latestKey = *obj.Key
}
}
if result.IsTruncated == nil || !*result.IsTruncated {
break
}
continuationToken = result.NextContinuationToken
}
if latestKey == "" {
return nil, fmt.Errorf("no objects found in the bucket " + s.bucketName)
}
return s.RestoreFromBackup(ctx, latestKey, localDirectory, pebbleOptions)
}
func (s *impl) Backup(ctx context.Context) error {
return s.doBackup(ctx)
}
func (s *impl) doBackup(ctx context.Context) error {
start := time.Now()
backupFileName := fmt.Sprintf(
"%04d-%02d-%02d-%02d-%02d-%02d.tar.gz",
start.Year(),
start.Month(),
start.Day(),
start.Hour(),
start.Minute(),
start.Second(),
)
zap.L().Info("performing backup: " + backupFileName)
tmpDir := getTempDir()
defer func() {
_ = os.RemoveAll(tmpDir)
}()
checkpointDir := tmpDir + "/checkpoint"
if err := s.db.Checkpoint(checkpointDir, pebble.WithFlushedWAL()); err != nil {
return errors.Wrap(err, "unable to checkpoint")
}
destFileName := tmpDir + "/" + backupFileName
if err := tarDirectory(checkpointDir, destFileName); err != nil {
return errors.Wrap(err, "unable to tar gz dir")
}
destFile, err := os.Open(destFileName)
if err := tarDirectory(checkpointDir, destFileName); err != nil {
return errors.Wrap(err, "unable to open resulting tar archive")
}
_, err = s.s3.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(backupFileName),
Body: destFile,
})
if err != nil {
return errors.Wrap(err, "unable to save backup file")
}
zap.L().Info("backup finished",
zap.Duration("duration", time.Since(start)),
)
return nil
}
func getTempDir() string {
tmpDir := os.TempDir() + strconv.Itoa(rand.Int())
if err := os.MkdirAll(tmpDir, 0777); err != nil {
panic(fmt.Sprintf("unable to create temp dir: %v", err))
}
return tmpDir
}
type PebbleBackup interface {
Backup(ctx context.Context) error
RestoreFromLatestBackup(
ctx context.Context,
localDirectory string,
pebbleOptions *pebble.Options,
) (*pebble.DB, error)
RestoreFromBackup(
ctx context.Context,
backupFileName string,
localDirectory string,
pebbleOptions *pebble.Options,
) (*pebble.DB, error)
}
func NewPebbleS3Backup(
db *pebble.DB,
accessKeyId string,
secretAccessKey string,
endpoint string,
region string,
bucket string,
) PebbleBackup {
return &impl{
s3: s3.New(s3.Options{}, func(o *s3.Options) {
o.BaseEndpoint = aws.String(endpoint)
o.Region = region
o.UsePathStyle = true
o.Credentials = aws.CredentialsProviderFunc(func(context.Context) (aws.Credentials, error) {
return aws.Credentials{
AccessKeyID: accessKeyId,
SecretAccessKey: secretAccessKey,
}, nil
})
}),
bucketName: bucket,
db: db,
}
}
func tarDirectory(srcDir string, destFile string) error {
// Create the destination file
out, err := os.Create(destFile)
if err != nil {
return err
}
defer out.Close()
// Create a gzip writer
gzipWriter := gzip.NewWriter(out)
defer gzipWriter.Close()
// Create a tar writer
tarWriter := tar.NewWriter(gzipWriter)
defer tarWriter.Close()
// Walk the directory
return filepath.Walk(srcDir, func(file string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
// Create a header for the file
header, err := tar.FileInfoHeader(fi, file)
if err != nil {
return err
}
// Write the header
if err := tarWriter.WriteHeader(header); err != nil {
return err
}
// If it's not a directory, write the file content
if !fi.IsDir() {
f, err := os.Open(file)
if err != nil {
return err
}
defer f.Close()
if _, err := io.Copy(tarWriter, f); err != nil {
return err
}
}
return nil
})
}
func untarDirectory(srcFile, destDir string) error {
// Open the tar.gz file
file, err := os.Open(srcFile)
if err != nil {
return err
}
defer file.Close()
// Create a gzip reader
gzipReader, err := gzip.NewReader(file)
if err != nil {
return err
}
defer gzipReader.Close()
// Create a tar reader
tarReader := tar.NewReader(gzipReader)
// Iterate through the tar archive
for {
header, err := tarReader.Next()
if err == io.EOF {
break // End of archive
}
if err != nil {
return err
}
// The target path is the destination directory plus the file path in the archive
target := filepath.Join(destDir, header.Name)
// Check the file type
switch header.Typeflag {
// If its a dir, create it
case tar.TypeDir:
if _, err := os.Stat(target); err != nil {
if err := os.MkdirAll(target, os.FileMode(header.Mode)); err != nil {
return err
}
}
// If it's a file, create it
case tar.TypeReg:
// Create the file
file, err := os.Create(target)
if err != nil {
return err
}
defer file.Close()
// Copy over the file contents
if _, err := io.Copy(file, tarReader); err != nil {
return err
}
}
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment