Last active
March 16, 2021 23:37
-
-
Save nivekuil/75981cfb573caeb2818e01bdddfc46ed to your computer and use it in GitHub Desktop.
crux scylla/cassandra doc store
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns app.crux.scylla | |
(:require [crux.codec :as codec] | |
[crux.db] | |
[crux.document-store :as ds] | |
[crux.io :refer [with-nippy-thaw-all]] | |
[crux.system :as sys] | |
[crux.memory] | |
[taoensso.nippy :as nippy] | |
[taoensso.timbre :as log] | |
[qbits.alia :as alia] | |
[promesa.core :as p])) | |
(defn id->bytes [id] | |
(crux.memory/->on-heap (codec/->id-buffer id))) | |
(defprotocol CruxAsyncDocumentStore | |
(submit-docs-async [this id-and-docs]) | |
(-fetch-docs-async [this ids])) | |
(defrecord ScyllaDocumentStore [session select insert delete] | |
crux.db/DocumentStore | |
(submit-docs [this id-and-docs] | |
@(submit-docs-async this id-and-docs)) | |
(-fetch-docs [this ids] | |
(into {} @(-fetch-docs-async this ids))) | |
CruxAsyncDocumentStore | |
(submit-docs-async [this id-and-docs] | |
(-> (fn [[id doc]] | |
(if (codec/evicted-doc? doc) | |
(alia/execute-async session delete | |
{:values [(id->bytes id)] | |
:idempotent? true | |
:consistency-level :local-quorum}) | |
(alia/execute-async session insert | |
{:values [(id->bytes id) (nippy/fast-freeze doc)] | |
:idempotent? true | |
:consistency-level :local-quorum}))) | |
(map id-and-docs) | |
p/all)) | |
(-fetch-docs-async [this ids] | |
(with-nippy-thaw-all | |
(-> (fn [id] | |
(p/let [result (alia/execute-async session select | |
{:values [(id->bytes id)] | |
:idempotent? true | |
:consistency-level :one})] | |
(let [{:keys [current-page]} result | |
[{:keys [value]}] current-page | |
doc (some-> value crux.memory/->on-heap nippy/fast-thaw)] | |
(when doc [id doc])))) | |
(map ids) | |
p/all)))) | |
(defn ->document-store {::sys/deps {:document-cache 'crux.cache/->cache} | |
::sys/args {:keyspace {:doc "keyspace" | |
:required? true} | |
:addresses {:doc "addresses" | |
:required? true} | |
:table {:doc "table name" | |
:required? true} | |
:local-dc {:doc "local datacenter"}}} | |
[{:keys [document-cache keyspace addresses local-dc table] :as opts}] | |
(let [session (alia/session {:contact-points addresses | |
:load-balancing-local-datacenter (or local-dc "datacenter1")})] | |
#_(alia/execute session (format " | |
CREATE KEYSPACE IF NOT EXISTS %s | |
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};" keyspace)) | |
(alia/execute session (format "USE %s;" keyspace)) | |
(alia/execute session (format "CREATE TABLE IF NOT EXISTS %s ( | |
key blob, | |
value blob, | |
PRIMARY KEY (key));" table)) | |
(let [select (alia/prepare session (format "SELECT key, value FROM %s WHERE key = ?;" table)) | |
insert (alia/prepare session (format "INSERT INTO %s (key, value) VALUES (?, ?);" table)) | |
delete (alia/prepare session (format "DELETE FROM %s WHERE key = ?;" table))] | |
(ds/->cached-document-store | |
(assoc opts | |
:document-cache document-cache | |
:document-store (->ScyllaDocumentStore session select insert delete)))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment