Skip to content

Instantly share code, notes, and snippets.

@Logiraptor
Created December 2, 2023 15:38
Show Gist options
  • Save Logiraptor/da882aa4a07b13267c99d9517c3382fc to your computer and use it in GitHub Desktop.
Save Logiraptor/da882aa4a07b13267c99d9517c3382fc to your computer and use it in GitHub Desktop.
a way to do object storage-only dbs
package db
import (
"context"
"io"
"strings"
"time"
"github.com/grafana/dskit/services"
)
// objects are stored in paths like
// /prefix/ulid
// There will be more than one object per prefix, and the current state is some function of all of them
// e.g.
// /<tenant>/rules/ulid1
// /<tenant>/rules/ulid2
// /<tenant>/rules/ulid3
// /<tenant>/recommendations/ulid4
// /<tenant>/recommendations/ulid5
// In this example there are 5 objects, but only two logical objects: <tenant>/rules and <tenant>/recommendations
// TODO: replace with a real ulid library
// ulids are used to ensure we have a deterministic ordering of objects within a prefix
type ulid [16]byte
// TODO: make this look like the thanos/objstore interface
type Storage interface {
Iter(prefix string, f func(string) error) error
Get(key string) (io.ReadCloser, error)
Upload(key string, rd io.Reader) error
Delete(key string) error
}
const Delim = "/"
// Compactor tells us how to merge values
// The simplest compactor is just a function that takes two values and returns one
// but we can also have more complex compactors that merge the values in some way
// This is the most important part to get right if we want any kind of consistency guarantees
type Compactor[T any] interface {
Compact(T, T) (T, error)
}
// Codec tells us how to encode/decode values to/from bytes
type Codec[T any] interface {
Encode(T) (io.Reader, error)
Decode(io.Reader) (T, error)
}
type DB[T any] struct {
services.Service
storage Storage
codec Codec[T]
compactor Compactor[T]
}
func New[T any](storage Storage, codec Codec[T], compactor Compactor[T]) *DB[T] {
db := &DB[T]{
codec: codec,
storage: storage,
compactor: compactor,
}
// TODO: use a hashring and elect a single replica as compactor per-prefix, or run it in a separate pod
db.Service = services.NewTimerService(time.Minute, db.compact, db.compact, nil)
return db
}
func splitPrefixAndUlid(path string) (string, ulid, error) {
lastDelim := strings.LastIndex(path, Delim)
prefix := path[:lastDelim]
ulid := ulid{}
// TODO: parse ulid or whatever
return prefix, ulid, nil
}
func combinePrefixAndUlid(prefix string, ulid ulid) string {
return prefix + Delim + "TODO"
}
func (db *DB[T]) compact(ctx context.Context) error {
// TODO: probably don't collect the entire world in memory
merged := map[string][]ulid{}
values := map[string]T{}
err := db.storage.Iter("", func(path string) error {
prefix, ulid, err := splitPrefixAndUlid(path)
if err != nil {
return err
}
rd, err := db.storage.Get(path)
if err != nil {
return err
}
defer rd.Close()
other, err := db.codec.Decode(rd)
if err != nil {
return err
}
values[prefix], err = db.compactor.Compact(values[prefix], other)
if err != nil {
return err
}
merged[prefix] = append(merged[prefix], ulid)
return nil
})
if err != nil {
return err
}
for prefix, ulids := range merged {
// TODO: use a new ulid here
err = db.Put(combinePrefixAndUlid(prefix, ulid{}), values[prefix])
if err != nil {
return err
}
for _, u := range ulids {
err = db.storage.Delete(combinePrefixAndUlid(prefix, u))
if err != nil {
return err
}
}
}
return nil
}
func (db *DB[T]) Put(prefix string, value T) error {
rd, err := db.codec.Encode(value)
if err != nil {
return err
}
// TODO: use a new ulid here
return db.storage.Upload(combinePrefixAndUlid(prefix, ulid{}), rd)
}
func (db *DB[T]) Get(prefix string) (T, error) {
var value T
err := db.storage.Iter(prefix, func(path string) error {
rd, err := db.storage.Get(path)
if err != nil {
return err
}
defer rd.Close()
other, err := db.codec.Decode(rd)
if err != nil {
return err
}
value, err = db.compactor.Compact(value, other)
if err != nil {
return err
}
return nil
})
if err != nil {
return value, err
}
return value, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment