Last active
March 5, 2020 10:12
-
-
Save mookerji/1c2b0da7dd4e50b54659 to your computer and use it in GitHub Desktop.
Idiomatic Clojure bindings for the C++ RocksDB library, an embedded persistent key-value store for flash storage and server workloads based on LevelDB. This implementation is based partially on Factual's clj-leveldb and is up here for some early review. If I've shared it with you, please don't repost until merged into a public Github.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns org.flausenhaus.rocksdb | |
"Idiomatic Clojure bindings for the C++ RocksDB library, an embedded | |
persistent key-value store for flash storage and server workloads based on | |
LevelDB. More details about RocksDB are given at | |
https://github.com/facebook/rocksdb/wiki/Rocksdb-Architecture-Guide. | |
The implementation here borrows heavily from Factual's clj-leveldb. | |
This namespace provides a few things: | |
0. Protocols/type definitions: byte serialization (IByteSerializable), | |
closeable sequences (CloseableSeq), and mutable | |
datastores (IPersistentKVFactory/IPersistentKVRead/IPersistentKVWrite). Key | |
and values are by default serialized through Nippy, can be be rebound using | |
your serialization/deserialization functiosn of choice by using | |
with-deserializer and with-serializer. RocksDB exposes a batch writes and | |
snapshots in addition to expected read/write access. | |
1. RocksDB-backed caching and memoization: RocksDB's bindings implement | |
clojure.core.cache.CacheProtocol, and provide memoization through | |
clojure.core.memoize. | |
Examples!: | |
;; Creates a RocksDB instance with sane defaults in a temp directory and | |
;; test read/write. | |
(require '[org.flausenhaus.clj-rocksdb :as rdb]) | |
(def db (rdb/mk-RocksDB)) | |
(rdb/put! db \"foo\" \"bar\") | |
(rdb/get db \"foo\") | |
;; Memoization (from clojure.core.memoize): | |
(def id (rdb/memo #(do (Thread/sleep 5000) (identity %)))) | |
(id 42) | |
;; ... waits 5 seconds | |
;; => 42 | |
(id 42) | |
;; instantly | |
;: => 42 | |
For more examples see the test/flausenhaus/rocksdb.clj." | |
(:refer-clojure :exclude [get]) | |
(:require | |
[clojure.core.cache :as cache] | |
[clojure.core.memoize :as memo] | |
[me.raynes.fs :as fs] | |
[taoensso.nippy :as nippy] | |
[clojure.java.io :as io]) | |
(:import | |
[clojure.core.memoize | |
PluggableMemoization] | |
[java.io | |
File | |
Closeable] | |
[java.util | |
Arrays] | |
[org.fusesource.rocksdbjni | |
JniDBFactory] | |
[org.iq80.leveldb | |
CompressionType | |
DB | |
DBFactory | |
DBIterator | |
Options | |
Range | |
ReadOptions | |
WriteOptions | |
WriteBatch])) | |
(set! *warn-on-reflection* true) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;;; Serialization and Sequence Utilities= | |
;; By default, we use nippy for serialization. | |
(def ^:dynamic *serializer* nippy/freeze) | |
(def ^:dynamic *deserializer* nippy/thaw) | |
(defmacro with-serializer | |
"Evaluates a body using the provided serializer." | |
[serializer & body] | |
`(binding [*serializer* ~serializer] | |
~@body)) | |
(defmacro with-deserializer | |
"Evaluates body using the provided deserializer." | |
[deserializer & body] | |
`(binding [*deserializer* ~deserializer] | |
~@body)) | |
(defmacro with-reader | |
"Utitily composing with-serializer and with-deserializer: given a both a | |
serializer and deserializer, evaluates body." | |
[serializer deserializer & body] | |
`(with-serializer ~serializer | |
(with-deserializer ~deserializer | |
~@body))) | |
(defprotocol IByteSerializable | |
"Serialize and deserialize Clojure to byte-arrays. | |
By contract, this should return a byte-array. Note that if you want to just | |
use a different serializer or deserializer, it should be more convenient to | |
use the with-* macros in this namespace to consume the *serializer* and | |
*deserializer* binding" | |
(^bytes serialize [obj] | |
"Binary serialize.") | |
(^bytes deserialize [obj] | |
"Binary deserialize.")) | |
(extend-protocol IByteSerializable | |
nil | |
(serialize [_] (*serializer* nil)) | |
(deserialize [_] (*deserializer* nil)) | |
java.lang.Object | |
(serialize [obj] (*serializer* obj)) | |
(deserialize [obj] (*deserializer* obj))) | |
(deftype CloseableSeq [impl-seq close-fn] | |
clojure.lang.ISeq | |
clojure.lang.Sequential | |
clojure.lang.Seqable | |
clojure.lang.IPersistentCollection | |
(equiv [this obj] | |
(loop [a this b obj] | |
(if (or (empty? a) (empty? b)) | |
(and (empty? a) (empty? b)) | |
(if (= (first obj) (first b)) | |
(recur (rest a) (rest b)) | |
false)))) | |
(count [this] | |
(count impl-seq)) | |
(first [this] | |
(if-let [f (first impl-seq)] | |
f | |
(.close this))) | |
(next [this] | |
(if-let [n (next impl-seq)] | |
(CloseableSeq. n close-fn) | |
(.close this))) | |
(cons [this obj] | |
(CloseableSeq. (cons obj impl-seq) close-fn)) | |
(more [this] | |
(if-let [n (next this)] | |
n | |
'())) | |
(empty [this] | |
(CloseableSeq. '() close-fn)) | |
(seq [this] | |
this) | |
java.io.Closeable | |
(close [_] | |
(close-fn))) | |
(defn- closeable-seq | |
"Given an underlying sequence and a closeable function, creates a sequence | |
that can be closed when exhausted." | |
[impl-seq close-fn] | |
(if (seq impl-seq) | |
(->CloseableSeq impl-seq close-fn) | |
(do (close-fn) | |
nil))) | |
(defn- ba= | |
"Compare two byte-arrays for equality." | |
[^bytes x ^bytes y] | |
(java.util.Arrays/equals x y)) | |
(defn- iterator-seq- | |
"Creates a closeable iterator a DBIterator given and start and end keys." | |
[^DBIterator iterator start end] | |
(when start | |
(.seek iterator (serialize start)) | |
(.seekToFirst iterator)) | |
(let [iter (iterator-seq iterator) | |
iter (let [end (serialize end)] | |
(if (ba= end (serialize nil)) | |
iter | |
(take-while #(ba= end ^bytes (key %)) iter))) | |
impl-seq (map #(vector (deserialize (key %)) (deserialize (val %))) iter) | |
close-fn #(.close iterator)] | |
(closeable-seq impl-seq close-fn))) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;;; RocksDB bindings: an overall protocol capturing the API for simple | |
;;; kv-datstore with mutable state. | |
(defprotocol IPersistentKVFactory | |
"An interface for creating to a persistent key-value datastore with mutable | |
state. To prevent resource leakage, any type implementing IPersistentKVWrite | |
should also implement java.lang.Closeable. | |
TODO: Add more documentation here about the argument types." | |
(open [this file-handle options] | |
"Opens a DB at the specified file handle and returns kv-datastore.") | |
(destroy [this file-handle options] | |
"Destroys DB at file-handle.") | |
(repair [this file-handle options] | |
"Attempts to reconstruct DB from file")) | |
(defprotocol IPersistentKVRead | |
"An interface for reading a persistent key-value datastore with mutable | |
state. To prevent resource leakage, any type implementing IPersistentKVRead | |
should also implement java.lang.Closeable. | |
TODO: Add more documentation here about the argument types." | |
(iterator [this] [this start] [this start end] | |
"Returns a CloseableSeq of map entries ranging from start to | |
end. Automagically closes when exhausted.") | |
(snapshot [this] | |
"Returns a snapshot of the database that can be read and iterated | |
over. This needs to be closed explicitly.") | |
(stats [this property] | |
"Returns statistics for the database.") | |
(bounds [this] | |
"Returns a tuple of the lower and upper keys in the database or snapshot.") | |
(get [this key] [this key default] | |
"Returns the value of key. If the key doesn't exist, returns default or | |
nil.")) | |
(defprotocol IPersistentKVWrite | |
"An interface for writing to a persistent key-value datastore with mutable | |
state. To prevent resource leakage, any type implementing IPersistentKVWrite | |
should also implement java.lang.Closeable. | |
TODO: Add more documentation here about the argument types." | |
(batch [this] [this write-options] | |
"Returns a batch writer for bulk writing key-value pairs atomically. Needs | |
to be closed explicitly.") | |
(sync! [this] | |
"Force a sync/write to disk.") | |
(compact! [this] [this start] [this start end] | |
"Forces compaction.") | |
(put! [this key value] | |
"Put a key-value pair.") | |
(put-all! [this kvs] | |
"(Batch) put a map of key-value pairs atomically.") | |
(delete! [this key] [this key value] | |
"Delete a key. Optionally, given a value, only delete if retieved value is equal.") | |
(delete-all! [this] | |
"Delete all entries.")) | |
(deftype Snapshot [^DB store ^ReadOptions read-options] | |
IPersistentKVRead | |
(iterator [this] | |
(iterator this nil nil)) | |
(iterator [this start] | |
(iterator this start nil)) | |
(iterator [this start end] | |
(if read-options | |
(iterator-seq- (.iterator store read-options) start end) | |
(iterator-seq- (.iterator store) start end))) | |
(bounds [this] | |
(with-open [^DBIterator iterator (.iterator store read-options)] | |
(when (.hasNext (doto iterator .seekToFirst)) | |
[(-> (doto iterator .seekToFirst) .peekNext key deserialize) | |
(-> (doto iterator .seekToLast) .peekNext key deserialize)]))) | |
(get [this key] | |
(get this key nil)) | |
(get [this key default] | |
(let [key (serialize key) | |
val (deserialize (if read-options | |
(.get store key read-options) | |
(.get store key)))] | |
(if val val default))) | |
(snapshot [this] | |
this) | |
java.io.Closeable | |
(close [this] | |
(-> read-options .snapshot .close))) | |
(deftype Batch [^DB store ^WriteBatch batch ^WriteOptions write-options] | |
IPersistentKVWrite | |
(put! [this key value] | |
(.put batch (serialize key) (serialize value))) | |
(delete! [this key] | |
(.delete batch (serialize key))) | |
(batch [this] | |
this) | |
(batch [this _] | |
this) | |
java.io.Closeable | |
(close [this] | |
(if write-options | |
(.write store batch write-options) | |
(.write store batch)) | |
(.close batch))) | |
(deftype RocksDB [^DB store ^ReadOptions read-options ^WriteOptions write-options ^File file] | |
;; TODO: Actually test this factory bit out. | |
IPersistentKVFactory | |
(open [this file-handle options] | |
(assert false "Not used yet.") | |
(.open JniDBFactory/factory file (Options.))) | |
(destroy [this file-handle options] | |
(.destroy JniDBFactory/factory file (Options.))) | |
(repair [this file-handle options] | |
(.repair JniDBFactory/factory file (Options.))) | |
IPersistentKVRead | |
(iterator [this] | |
(iterator this nil nil)) | |
(iterator [this start] | |
(iterator this start nil)) | |
(iterator [this start end] | |
(iterator-seq- (.iterator store) start end)) | |
(snapshot [this] | |
(->Snapshot this (doto (ReadOptions.) (.snapshot (.getSnapshot store))))) | |
(stats [this property] | |
(.getProperty store "rockdb.stats")) | |
(bounds [this] | |
(with-open [^DBIterator iterator (.iterator store read-options)] | |
(when (.hasNext (doto iterator .seekToFirst)) | |
[(-> (doto iterator .seekToFirst) .peekNext key deserialize) | |
(-> (doto iterator .seekToLast) .peekNext key deserialize)]))) | |
(get [this key] | |
(get this key nil)) | |
(get [this key default] | |
(let [key (serialize key) | |
val (deserialize (if read-options | |
(.get store key read-options) | |
(.get store key)))] | |
(if val val default))) | |
IPersistentKVWrite | |
(batch [this] | |
(->Batch store (.createWriteBatch store) write-options)) | |
(batch [this options] | |
(->Batch store (.createWriteBatch store) options)) | |
(sync! [this] | |
(with-open [^Batch batch (batch this (doto (WriteOptions.) (.sync true)))])) | |
(compact! [this] | |
(let [[start end] (bounds this)] | |
(compact! this start end))) | |
(compact! [this start] | |
(let [[_ end] (bounds this)] | |
(compact! this start end))) | |
(compact! [this start end] | |
(.compactRange store (serialize start) (serialize end))) | |
(put! [this key value] | |
(let [k (serialize key) | |
v (serialize value)] | |
(if write-options | |
(.put store k v write-options) | |
(.put store k v)))) | |
(put-all! [this kvs] | |
(with-open [^Batch batch (batch this)] | |
(doseq [[key value] kvs | |
:let [k (serialize key) | |
v (serialize value)]] | |
(put! batch k v)))) | |
(delete! [this key] | |
(let [k (serialize key)] | |
(if write-options | |
(.delete store k write-options) | |
(.delete store k)))) | |
(delete! [this key value] | |
(assert false "not implemented yet")) | |
(delete-all! [this] | |
(assert false "not implemented yet")) | |
java.io.Closeable | |
(close [this] | |
(.close store)) | |
;; TODO: Move into separate cache.. | |
clojure.lang.Counted | |
(count [this] | |
(->> this | |
bounds | |
(map serialize) | |
(apply #(Range. %1 %2)) | |
into-array | |
(.getApproximateSizes store) | |
first)) | |
clojure.lang.Seqable | |
(seq [this] | |
(for [[k v] (iterator this nil nil)] | |
(clojure.lang.MapEntry. (deserialize k) (deserialize v)))) | |
clojure.lang.ILookup | |
(valAt [this key] | |
(get this key nil)) | |
(valAt [this key not-found] | |
(if-let [val (get this key nil)] | |
val | |
not-found))) | |
;; TODO: Add test examples. | |
(deftype RocksDBCache [db] | |
cache/CacheProtocol | |
(lookup [this item] | |
(delay | |
(try | |
(get db item) | |
(catch java.lang.NullPointerException e | |
nil)))) | |
(lookup [this item not-found] | |
(delay | |
(try | |
(get db item not-found) | |
(catch java.lang.NullPointerException e | |
nil)))) | |
(has? [this item] | |
(try | |
(get db item) | |
(catch java.lang.NullPointerException e | |
nil))) | |
(hit [this item] | |
(RocksDBCache. db)) | |
(miss [this item result] | |
(put! db item @result) | |
(RocksDBCache. db)) | |
(evict [this key] | |
(delete! db key) | |
(RocksDBCache. db)) | |
(seed [this base] | |
(when base | |
(put-all! db base)) | |
(RocksDBCache. db)) | |
clojure.lang.Counted | |
(count [this] | |
(count db)) | |
clojure.lang.Seqable | |
(seq [this] | |
(seq db)) | |
clojure.lang.ILookup | |
(valAt [this key] | |
(get db key nil)) | |
(valAt [this key not-found] | |
(if-let [val (get db key nil)] | |
val | |
not-found)) | |
java.lang.Object | |
(toString [this] | |
(str db))) | |
(def ^:private option-setters | |
{:create-if-missing? #(.createIfMissing ^Options %1 %2) | |
:error-if-exists? #(.errorIfExists ^Options %1 %2) | |
:write-buffer-size #(.writeBufferSize ^Options %1 %2) | |
:block-size #(.blockSize ^Options %1 %2) | |
:block-restart-interval #(.blockRestartInterval ^Options %1 %2) | |
:max-open-files #(.maxOpenFiles ^Options %1 %2) | |
:cache-size #(.cacheSize ^Options %1 %2) | |
:comparator #(.comparator ^Options %1 %2) | |
:paranoid-checks? #(.paranoidChecks ^Options %1 %2) | |
:compress? #(.compressionType ^Options %1 (if % CompressionType/SNAPPY CompressionType/NONE)) | |
:logger #(.logger ^Options %1 %2)}) | |
(defn mk-RocksDB | |
"Creates a closeable database object, which takes a directory and zero or | |
more options and implements both IPersistentKVRead and IPersistentKVWrite." | |
([] | |
(mk-RocksDB {} (fs/temp-dir "rocks-db"))) | |
([options] | |
(mk-RocksDB options (fs/temp-dir "rocks-db"))) | |
([{:keys [create-if-missing? | |
error-if-exists? | |
write-buffer-size | |
block-size | |
max-open-files | |
cache-size | |
comparator | |
compress? | |
paranoid-checks? | |
block-restart-interval | |
logger] | |
:or {compress? true | |
cache-size (* 32 1024 1024) | |
block-size (* 16 1024) | |
write-buffer-size (* 32 1024 1024) | |
create-if-missing? true | |
error-if-exists? false} | |
:as options} | |
directory] | |
(let [handle (io/file directory)] | |
(->RocksDB | |
(.open JniDBFactory/factory | |
handle | |
(let [opts (Options.)] | |
(doseq [[k v] options] | |
(when (and v (contains? option-setters k)) | |
((option-setters k) opts v))) | |
opts)) | |
(ReadOptions.) | |
(WriteOptions.) | |
handle)))) | |
(defn cache-factory | |
"Returns a RocksDB-backed persistent-cache satisfying | |
core.cache/CacheProtocol." | |
([] | |
(cache-factory {})) | |
([init-kvs] | |
(cache/seed (mk-RocksDB) {}))) | |
(comment | |
(def db (mk-RocksDB {})) | |
(put! db "foo" "bar") | |
(get db "foo") | |
(put! db nippy/stress-data nippy/stress-data) | |
(prn (get db nippy/stress-data))) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;;; Memoization. | |
(defn memo | |
"Given a function f, returns a memoized version backed by RocksDB." | |
[f] | |
(memo/build-memoizer | |
#(clojure.core.memoize.PluggableMemoization. %1 (cache-factory)) f)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment