Skip to content

Instantly share code, notes, and snippets.

@andreyvit
Created November 6, 2024 10:35
Show Gist options
  • Save andreyvit/94257af4df81a77ba6fde5790e6644fa to your computer and use it in GitHub Desktop.
Save andreyvit/94257af4df81a77ba6fde5790e6644fa to your computer and use it in GitHub Desktop.
Faster Bolt DB compaction
package main
import (
"flag"
"log"
"os"
"time"
"go.etcd.io/bbolt"
)
func main() {
log.SetOutput(os.Stderr)
log.SetFlags(0)
var txMaxSize int64
var dstNoSync bool
var fillPercent float64
var verbose bool
flag.Int64Var(&txMaxSize, "tx-max-size", 2*1024*1024, "")
flag.BoolVar(&dstNoSync, "no-sync", true, "")
flag.BoolVar(&verbose, "verbose", true, "")
flag.Float64Var(&fillPercent, "fill", 1.0, "page fill percentage (0.5 regular, 0.9 compact, 1.0 dense)")
flag.Parse()
if flag.NArg() != 2 {
log.Fatalf("usage: go run _scripts/bolt-compact.go input.db output.db")
}
srcPath := flag.Arg(0)
dstPath := flag.Arg(1)
srcInfo, err := os.Stat(srcPath)
if err != nil {
if os.IsNotExist(err) {
log.Fatalf("source file does not exist: %s", srcPath)
}
log.Fatalf("failed to stat source file: %s", err)
}
srcSize := srcInfo.Size()
src, err := bbolt.Open(srcPath, 0400, &bbolt.Options{ReadOnly: true})
if err != nil {
log.Fatalf("failed to open source db: %s", err)
}
defer src.Close()
dst, err := bbolt.Open(dstPath, 0644, &bbolt.Options{NoSync: dstNoSync})
if err != nil {
log.Fatalf("failed to open destination db: %s", err)
}
defer dst.Close()
start := time.Now()
c := compacter{
start: start,
txMaxSize: txMaxSize,
fillPercent: fillPercent,
dst: dst,
verbose: verbose,
srcSize: srcSize,
}
err = c.copyDB(src)
if err != nil {
log.Fatalf("compact failed: %s", err)
}
elapsed := time.Since(start).Seconds()
dstInfo, err := os.Stat(dstPath)
if err != nil {
log.Fatalf("failed to stat destination file: %s", err)
} else if dstInfo.Size() == 0 {
log.Fatalf("zero output db size")
}
dstSize := dstInfo.Size()
log.Printf("compact finished in %0.2f seconds, %d -> %d bytes (gain=%.2fx)", elapsed, srcSize, dstSize, float64(srcSize)/float64(dstSize))
}
type compacter struct {
start time.Time
txMaxSize int64
fillPercent float64
dst *bbolt.DB
tx *bbolt.Tx
size int64
totalSize int64
totalKeys int
txCount int
buckets []*bbolt.Bucket
bucketKeys [][]byte
verbose bool
srcSize int64
lastReportedTotal int64
}
const reportInterval = 1024 * 1024
func (c *compacter) copyDB(src *bbolt.DB) error {
err := src.View(func(srcTx *bbolt.Tx) error {
return srcTx.ForEach(func(k []byte, b *bbolt.Bucket) error {
return c.copyBucket(b, k)
})
})
if err != nil {
if c.tx != nil {
c.tx.Rollback()
}
return err
}
if c.tx != nil {
elapsed := time.Since(c.start).Seconds()
size := c.tx.Size()
err := c.tx.Commit()
if err != nil {
return err
}
log.Printf("DONE in %d transactions, %.2f sec, %.1f MB data in %d keys, %.1f MB written (%.2f%% of source), %.2f MB/sec", c.txCount, elapsed, float64(c.totalSize)/(1024*1024), c.totalKeys, float64(size)/(1024*1024), float64(c.totalSize)/float64(c.srcSize)*100, float64(size)/(1024*1024)/elapsed)
}
return nil
}
func (c *compacter) copyBucket(b *bbolt.Bucket, key []byte) error {
err := c.startBucket(key, b.Sequence())
if err != nil {
return err
}
err = b.ForEach(func(k, v []byte) error {
if v == nil {
return c.copyBucket(b.Bucket(k), k)
} else {
return c.handleValue(k, v)
}
})
if err != nil {
return err
}
c.finishBucket()
return nil
}
func (c *compacter) startBucket(k []byte, seq uint64) error {
tx, err := c.prepare(int64(len(k)))
if err != nil {
return err
}
var b *bbolt.Bucket
if n := len(c.buckets); n > 0 {
top := c.buckets[n-1]
b, err = top.CreateBucketIfNotExists(k)
} else {
b, err = tx.CreateBucketIfNotExists(k)
}
if err != nil {
return err
}
c.buckets = append(c.buckets, b)
c.bucketKeys = append(c.bucketKeys, k)
b.FillPercent = c.fillPercent
b.SetSequence(seq)
return nil
}
func (c *compacter) finishBucket() {
n := len(c.buckets)
c.buckets[n-1] = nil
c.buckets = c.buckets[:n-1]
c.bucketKeys = c.bucketKeys[:n-1]
}
func (c *compacter) handleValue(k, v []byte) error {
_, err := c.prepare(int64(len(k) + len(v)))
if err != nil {
return err
}
b := c.buckets[len(c.buckets)-1]
return b.Put(k, v)
}
func (c *compacter) prepare(sz int64) (*bbolt.Tx, error) {
tx := c.tx
if tx != nil && c.size > 0 && c.size+sz > c.txMaxSize && c.txMaxSize != 0 {
size := tx.Size()
err := tx.Commit()
if err != nil {
return nil, err
}
if c.verbose {
elapsed := time.Since(c.start).Seconds()
log.Printf("%d transactions so far, %.2f sec, %.1f MB data in %d keys, %.1f MB written (%.2f%% of source), %.2f MB/sec", c.txCount, elapsed, float64(c.totalSize)/(1024*1024), c.totalKeys, float64(size)/(1024*1024), float64(c.totalSize)/float64(c.srcSize)*100, float64(size)/(1024*1024)/elapsed)
}
tx = nil
}
if tx == nil {
var err error
tx, err = c.dst.Begin(true)
if err != nil {
return nil, err
}
c.tx = tx
c.txCount++
c.size = 0
c.buckets = c.buckets[:0]
for i, k := range c.bucketKeys {
var b *bbolt.Bucket
if i > 0 {
b = c.buckets[i-1].Bucket(k)
} else {
b = tx.Bucket(k)
}
if b == nil {
panic("unreachable: previously created bucket not found")
}
b.FillPercent = c.fillPercent
c.buckets = append(c.buckets, b)
}
}
c.size += sz
c.totalSize += sz
c.totalKeys++
return tx, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment