Last active
February 12, 2024 18:39
-
-
Save larry0x/4832f11c6547686ce9e648116f11293f to your computer and use it in GitHub Desktop.
How to use RocksDB's user-defined timestamp feature in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module playground | |
go 1.21 | |
require github.com/linxGnu/grocksdb v1.8.4 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// How to run this on macOS (taught by ChatGPT): | |
// | |
// 1. Install dependencies using Homebrew: | |
// | |
// ```bash | |
// brew install rocksdb snappy lz4 zstd | |
// ``` | |
// | |
// 2. Add dependencies to CGO flags: | |
// | |
// ```bash | |
// export ROCKSDB_DIR=$(brew --prefix rocksdb) | |
// export SNAPPY_DIR=$(brew --prefix snappy) | |
// export LZ4_DIR=$(brew --prefix lz4) | |
// export ZSTD_DIR=$(brew --prefix zstd) | |
// export CGO_LDFLAGS="-L$ROCKSDB_DIR/lib -L$SNAPPY_DIR/lib -L$LZ4_DIR/lib -L$ZSTD_DIR/lib -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd" | |
// export CGO_CFLAGS="-I$ROCKSDB_DIR/include -I$SNAPPY_DIR/include -I$LZ4_DIR/include -I$ZSTD_DIR/include" | |
// ``` | |
// | |
// 3. Finally, run the file: | |
// | |
// ```bash | |
// go run main.go | |
// ``` | |
// | |
// I still get the following warning, not sure how to resolve, but at least the | |
// program runs: | |
// | |
// > ld: warning: ignoring duplicate libraries: '-lbz2', '-lc++', '-llz4', '-lm', '-lrocksdb', '-lsnappy', '-lz', '-lzstd' | |
package main | |
import ( | |
"bytes" | |
"encoding/binary" | |
"encoding/hex" | |
"fmt" | |
"os" | |
"github.com/linxGnu/grocksdb" | |
) | |
const ( | |
cFNameDefault = "default" | |
cFNameTest = "test" | |
dataDir = "./testdata" | |
timestampSize = 8 | |
) | |
type KVPair struct { | |
Key []byte | |
Value []byte // may be nil, meaning the key doesn't exist | |
} | |
func init() { | |
// delete the data directory if it already exists from a previous run | |
if _, err := os.Stat(dataDir); !os.IsNotExist(err) { | |
if err = os.RemoveAll(dataDir); err != nil { | |
panic(err) | |
} | |
} | |
} | |
func main() { | |
// open the database with a "test" column family | |
db, cfHandles, err := grocksdb.OpenDbColumnFamilies( | |
newDBOptions(), | |
dataDir, | |
[]string{ | |
cFNameDefault, | |
cFNameTest, | |
}, | |
[]*grocksdb.Options{ | |
// for this demo, we don't use the default colume family, so no need to | |
// enable timestamping on it | |
grocksdb.NewDefaultOptions(), | |
// enable timestamping on our "test" column family | |
newCFOptionsWithTS(), | |
}, | |
) | |
if err != nil { | |
panic(err) | |
} | |
cfHandle := cfHandles[1] | |
// create two timestamps | |
ts1 := [timestampSize]byte{} | |
binary.LittleEndian.PutUint64(ts1[:], uint64(10000)) | |
ts2 := [timestampSize]byte{} | |
binary.LittleEndian.PutUint64(ts2[:], uint64(20000)) | |
// write a batch under the first timestamp | |
{ | |
batch1 := grocksdb.NewWriteBatch() | |
batch1.PutCFWithTS(cfHandle, []byte("donald"), ts1[:], []byte("trump")) | |
batch1.PutCFWithTS(cfHandle, []byte("jake"), ts1[:], []byte("shepherd")) | |
batch1.PutCFWithTS(cfHandle, []byte("joe"), ts1[:], []byte("biden")) | |
batch1.PutCFWithTS(cfHandle, []byte("larry"), ts1[:], []byte("engineer")) | |
db.Write(grocksdb.NewDefaultWriteOptions(), batch1) | |
} | |
// write another batch under the second timestamp | |
// for testing purpose, this batch 1) deletes a key from the previous batch, | |
// 2) overwrites a value from the previous batch, 3) inserts a new key not | |
// present in the previous batch. | |
{ | |
batch2 := grocksdb.NewWriteBatch() | |
batch2.PutCFWithTS(cfHandle, []byte("donald"), ts2[:], []byte("duck")) | |
batch2.DeleteCFWithTS(cfHandle, []byte("joe"), ts2[:]) | |
batch2.PutCFWithTS(cfHandle, []byte("pumpkin"), ts2[:], []byte("cat")) | |
db.Write(grocksdb.NewDefaultWriteOptions(), batch2) | |
} | |
// check data at the two timestamps, respectively | |
checkDataAtTimestamp(db, cfHandle, ts1[:], []KVPair{ | |
{ | |
Key: []byte("donald"), | |
Value: []byte("trump"), | |
}, | |
{ | |
Key: []byte("jake"), | |
Value: []byte("shepherd"), | |
}, | |
{ | |
Key: []byte("joe"), | |
Value: []byte("biden"), | |
}, | |
{ | |
Key: []byte("larry"), | |
Value: []byte("engineer"), | |
}, | |
{ | |
Key: []byte("pumpkin"), | |
Value: nil, | |
}, | |
}) | |
checkDataAtTimestamp(db, cfHandle, ts2[:], []KVPair{ | |
{ | |
Key: []byte("donald"), | |
Value: []byte("duck"), | |
}, | |
{ | |
Key: []byte("jake"), | |
Value: []byte("shepherd"), | |
}, | |
{ | |
Key: []byte("joe"), | |
Value: nil, | |
}, | |
{ | |
Key: []byte("larry"), | |
Value: []byte("engineer"), | |
}, | |
{ | |
Key: []byte("pumpkin"), | |
Value: []byte("cat"), | |
}, | |
}) | |
} | |
func checkDataAtTimestamp(db *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle, ts []byte, expected []KVPair) { | |
fmt.Printf("checking data at timestamp %s\n", hex.EncodeToString(ts)) | |
opts := newReadOptionsWithTS(ts) | |
for _, kv := range expected { | |
value, _, err := db.GetCFWithTS(opts, cfHandle, kv.Key) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Printf( | |
"expect key = %s, value = %s, found value = %s\n", | |
string(kv.Key), | |
string(kv.Value), | |
string(value.Data()), | |
) | |
} | |
} | |
func newDBOptions() *grocksdb.Options { | |
opts := grocksdb.NewDefaultOptions() | |
opts.SetCreateIfMissing(true) | |
opts.SetCreateIfMissingColumnFamilies(true) | |
return opts | |
} | |
func newCFOptionsWithTS() *grocksdb.Options { | |
opts := grocksdb.NewDefaultOptions() | |
opts.SetComparator(grocksdb.NewComparatorWithTimestamp( | |
"leveldb.BytewiseComparator.u64ts", | |
timestampSize, | |
compare, | |
compareTS, | |
compareWithoutTS, | |
)) | |
return opts | |
} | |
func newReadOptionsWithTS(ts []byte) *grocksdb.ReadOptions { | |
opts := grocksdb.NewDefaultReadOptions() | |
opts.SetTimestamp(ts) | |
return opts | |
} | |
func compare(a []byte, b []byte) int { | |
ord := compareWithoutTS(a, true, b, true) | |
if ord != 0 { | |
return ord | |
} | |
return -compareTS(extractTimestampFromUserKey(a), extractTimestampFromUserKey(b)) | |
} | |
func compareTS(bz1, bz2 []byte) int { | |
ts1 := binary.LittleEndian.Uint64(bz1) | |
ts2 := binary.LittleEndian.Uint64(bz2) | |
switch { | |
case ts1 < ts2: | |
return -1 | |
case ts1 > ts2: | |
return 1 | |
default: | |
return 0 | |
} | |
} | |
func compareWithoutTS(a []byte, aHasTS bool, b []byte, bHasTS bool) int { | |
if aHasTS { | |
a = stripTimestampfromUserKey(a) | |
} | |
if bHasTS { | |
b = stripTimestampfromUserKey(b) | |
} | |
return bytes.Compare(a, b) | |
} | |
func extractTimestampFromUserKey(userKey []byte) []byte { | |
return userKey[len(userKey)-timestampSize:] | |
} | |
func stripTimestampfromUserKey(userKey []byte) []byte { | |
return userKey[:len(userKey)-timestampSize] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment