Created
December 2, 2023 15:38
-
-
Save Logiraptor/da882aa4a07b13267c99d9517c3382fc to your computer and use it in GitHub Desktop.
a way to do object storage-only dbs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package db | |
import ( | |
"context" | |
"io" | |
"strings" | |
"time" | |
"github.com/grafana/dskit/services" | |
) | |
// objects are stored in paths like | |
// /prefix/ulid | |
// There will be more than one object per prefix, and the current state is some function of all of them | |
// e.g. | |
// /<tenant>/rules/ulid1 | |
// /<tenant>/rules/ulid2 | |
// /<tenant>/rules/ulid3 | |
// /<tenant>/recommendations/ulid4 | |
// /<tenant>/recommendations/ulid5 | |
// In this example there are 5 objects, but only two logical objects: <tenant>/rules and <tenant>/recommendations | |
// TODO: replace with a real ulid library | |
// ulids are used to ensure we have a deterministic ordering of objects within a prefix | |
type ulid [16]byte | |
// TODO: make this look like the thanos/objstore interface | |
type Storage interface { | |
Iter(prefix string, f func(string) error) error | |
Get(key string) (io.ReadCloser, error) | |
Upload(key string, rd io.Reader) error | |
Delete(key string) error | |
} | |
const Delim = "/" | |
// Compactor tells us how to merge values | |
// The simplest compactor is just a function that takes two values and returns one | |
// but we can also have more complex compactors that merge the values in some way | |
// This is the most important part to get right if we want any kind of consistency guarantees | |
type Compactor[T any] interface { | |
Compact(T, T) (T, error) | |
} | |
// Codec tells us how to encode/decode values to/from bytes | |
type Codec[T any] interface { | |
Encode(T) (io.Reader, error) | |
Decode(io.Reader) (T, error) | |
} | |
type DB[T any] struct { | |
services.Service | |
storage Storage | |
codec Codec[T] | |
compactor Compactor[T] | |
} | |
func New[T any](storage Storage, codec Codec[T], compactor Compactor[T]) *DB[T] { | |
db := &DB[T]{ | |
codec: codec, | |
storage: storage, | |
compactor: compactor, | |
} | |
// TODO: use a hashring and elect a single replica as compactor per-prefix, or run it in a separate pod | |
db.Service = services.NewTimerService(time.Minute, db.compact, db.compact, nil) | |
return db | |
} | |
func splitPrefixAndUlid(path string) (string, ulid, error) { | |
lastDelim := strings.LastIndex(path, Delim) | |
prefix := path[:lastDelim] | |
ulid := ulid{} | |
// TODO: parse ulid or whatever | |
return prefix, ulid, nil | |
} | |
func combinePrefixAndUlid(prefix string, ulid ulid) string { | |
return prefix + Delim + "TODO" | |
} | |
func (db *DB[T]) compact(ctx context.Context) error { | |
// TODO: probably don't collect the entire world in memory | |
merged := map[string][]ulid{} | |
values := map[string]T{} | |
err := db.storage.Iter("", func(path string) error { | |
prefix, ulid, err := splitPrefixAndUlid(path) | |
if err != nil { | |
return err | |
} | |
rd, err := db.storage.Get(path) | |
if err != nil { | |
return err | |
} | |
defer rd.Close() | |
other, err := db.codec.Decode(rd) | |
if err != nil { | |
return err | |
} | |
values[prefix], err = db.compactor.Compact(values[prefix], other) | |
if err != nil { | |
return err | |
} | |
merged[prefix] = append(merged[prefix], ulid) | |
return nil | |
}) | |
if err != nil { | |
return err | |
} | |
for prefix, ulids := range merged { | |
// TODO: use a new ulid here | |
err = db.Put(combinePrefixAndUlid(prefix, ulid{}), values[prefix]) | |
if err != nil { | |
return err | |
} | |
for _, u := range ulids { | |
err = db.storage.Delete(combinePrefixAndUlid(prefix, u)) | |
if err != nil { | |
return err | |
} | |
} | |
} | |
return nil | |
} | |
func (db *DB[T]) Put(prefix string, value T) error { | |
rd, err := db.codec.Encode(value) | |
if err != nil { | |
return err | |
} | |
// TODO: use a new ulid here | |
return db.storage.Upload(combinePrefixAndUlid(prefix, ulid{}), rd) | |
} | |
func (db *DB[T]) Get(prefix string) (T, error) { | |
var value T | |
err := db.storage.Iter(prefix, func(path string) error { | |
rd, err := db.storage.Get(path) | |
if err != nil { | |
return err | |
} | |
defer rd.Close() | |
other, err := db.codec.Decode(rd) | |
if err != nil { | |
return err | |
} | |
value, err = db.compactor.Compact(value, other) | |
if err != nil { | |
return err | |
} | |
return nil | |
}) | |
if err != nil { | |
return value, err | |
} | |
return value, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment