Created
January 29, 2024 11:38
-
-
Save ivarref/c26deae954354ed343719bc5cde06a41 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns user.db.transact-assert-basis-t | |
(:require [clojure.edn :as edn] | |
[clojure.stacktrace :as st] | |
[datomic.api :as d]) | |
(:import (clojure.lang IBlockingDeref IDeref IPending) | |
(datomic Connection) | |
(java.util.concurrent Future TimeUnit TimeoutException))) | |
(def schema | |
"transact-assert-basis-t requires a single function present in the transactor: this function. | |
This schema must be transacted before `transact` is used, for example like so: | |
@(d/transact conn user.db.transact-assert-basis-t/schema)" | |
[(edn/read-string | |
{:readers {'db/id datomic.db/id-literal | |
'db/fn datomic.function/construct}} | |
"{:db/ident :assert-basis-t | |
:db/fn #db/fn {:lang \"clojure\" | |
:requires [] | |
:imports [] | |
:params [db expected-basis-t] | |
:code (when (not= (d/basis-t db) expected-basis-t)\n (throw (ex-info \"Basis-t mismatch\" {})))}}")]) | |
; Borrowed from clojure.core | |
(defn ^:private deref-future | |
([^Future fut] | |
(.get fut)) | |
([^Future fut timeout-ms timeout-val] | |
(try (.get fut timeout-ms TimeUnit/MILLISECONDS) | |
(catch TimeoutException _ | |
timeout-val)))) | |
(def ^:dynamic *max-attempts* 1000) | |
(def ^:dynamic *retry-callback* | |
"Will be called when a transaction is retried. Useful for testing." | |
(fn [] nil)) | |
(defn transact | |
"Transact the return value of (f (d/db conn)). | |
Ensures that `f` will receive the latest database (`db`) | |
and that database one beyond `db` will include | |
the transaction data returned by `f`. | |
Transact thus provides a global optimistic lock on the database. | |
`f` may be called multiple times, and thus should be free of side effects. | |
`f` should provide up-to-date transaction data | |
and/or throw exceptions based on its `db` argument." | |
[conn f] | |
(assert (instance? Connection conn) "conn must be an instance of datomic.Connection") | |
(assert (fn? f) "f must be a function of one argument") | |
(let [fut (future | |
(loop [attempts 0 | |
db (d/db conn)] | |
(when (> attempts *max-attempts*) | |
(throw (ex-info "Max attempts reached" {:max-attempts *max-attempts*}))) | |
(let [basis-t (d/basis-t db) | |
[res v] (try | |
[:ok @(d/transact conn (into | |
(let [tx-data (f db)] | |
(assert (vector? tx-data) "Expected `f` to return a vector") | |
tx-data) | |
[[:assert-basis-t basis-t]]))] | |
(catch Throwable t | |
(if (= "Basis-t mismatch" (.getMessage (st/root-cause t))) | |
(do | |
(*retry-callback*) | |
[:retry :retry]) | |
[:throw t])))] | |
(cond (= res :ok) | |
v | |
(= res :throw) | |
(throw v) | |
(= res :retry) | |
(do | |
(while (= basis-t (d/basis-t (d/db conn))) | |
(Thread/sleep 1)) | |
(recur (inc attempts) (d/db conn)))))))] | |
(reify | |
IDeref | |
(deref [_] | |
(deref-future fut)) | |
IBlockingDeref | |
(deref [_ timeout-ms timeout-val] | |
(deref-future fut timeout-ms timeout-val)) | |
IPending | |
(isRealized [_] (.isDone fut)) | |
Future | |
(get [_] | |
(.get fut)) | |
(get [_ timeout unit] | |
(.get fut timeout unit)) | |
(isCancelled [_] (.isCancelled fut)) | |
(isDone [_] (.isDone fut)) | |
(cancel [_ interrupt?] (.cancel fut interrupt?))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment