Skip to content

Instantly share code, notes, and snippets.

@ivarref
Created January 29, 2024 11:38
Show Gist options
  • Save ivarref/c26deae954354ed343719bc5cde06a41 to your computer and use it in GitHub Desktop.
Save ivarref/c26deae954354ed343719bc5cde06a41 to your computer and use it in GitHub Desktop.
(ns user.db.transact-assert-basis-t
(:require [clojure.edn :as edn]
[clojure.stacktrace :as st]
[datomic.api :as d])
(:import (clojure.lang IBlockingDeref IDeref IPending)
(datomic Connection)
(java.util.concurrent Future TimeUnit TimeoutException)))
(def schema
"transact-assert-basis-t requires a single function present in the transactor: this function.
This schema must be transacted before `transact` is used, for example like so:
@(d/transact conn user.db.transact-assert-basis-t/schema)"
[(edn/read-string
{:readers {'db/id datomic.db/id-literal
'db/fn datomic.function/construct}}
"{:db/ident :assert-basis-t
:db/fn #db/fn {:lang \"clojure\"
:requires []
:imports []
:params [db expected-basis-t]
:code (when (not= (d/basis-t db) expected-basis-t)\n (throw (ex-info \"Basis-t mismatch\" {})))}}")])
; Borrowed from clojure.core
(defn ^:private deref-future
([^Future fut]
(.get fut))
([^Future fut timeout-ms timeout-val]
(try (.get fut timeout-ms TimeUnit/MILLISECONDS)
(catch TimeoutException _
timeout-val))))
(def ^:dynamic *max-attempts* 1000)
(def ^:dynamic *retry-callback*
"Will be called when a transaction is retried. Useful for testing."
(fn [] nil))
(defn transact
"Transact the return value of (f (d/db conn)).
Ensures that `f` will receive the latest database (`db`)
and that database one beyond `db` will include
the transaction data returned by `f`.
Transact thus provides a global optimistic lock on the database.
`f` may be called multiple times, and thus should be free of side effects.
`f` should provide up-to-date transaction data
and/or throw exceptions based on its `db` argument."
[conn f]
(assert (instance? Connection conn) "conn must be an instance of datomic.Connection")
(assert (fn? f) "f must be a function of one argument")
(let [fut (future
(loop [attempts 0
db (d/db conn)]
(when (> attempts *max-attempts*)
(throw (ex-info "Max attempts reached" {:max-attempts *max-attempts*})))
(let [basis-t (d/basis-t db)
[res v] (try
[:ok @(d/transact conn (into
(let [tx-data (f db)]
(assert (vector? tx-data) "Expected `f` to return a vector")
tx-data)
[[:assert-basis-t basis-t]]))]
(catch Throwable t
(if (= "Basis-t mismatch" (.getMessage (st/root-cause t)))
(do
(*retry-callback*)
[:retry :retry])
[:throw t])))]
(cond (= res :ok)
v
(= res :throw)
(throw v)
(= res :retry)
(do
(while (= basis-t (d/basis-t (d/db conn)))
(Thread/sleep 1))
(recur (inc attempts) (d/db conn)))))))]
(reify
IDeref
(deref [_]
(deref-future fut))
IBlockingDeref
(deref [_ timeout-ms timeout-val]
(deref-future fut timeout-ms timeout-val))
IPending
(isRealized [_] (.isDone fut))
Future
(get [_]
(.get fut))
(get [_ timeout unit]
(.get fut timeout unit))
(isCancelled [_] (.isCancelled fut))
(isDone [_] (.isDone fut))
(cancel [_ interrupt?] (.cancel fut interrupt?)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment