Created
November 15, 2015 16:09
-
-
Save timewald/d9acbde47debd8a6793c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; setup db to test with | |
;; note that this setup uses a local dev transactor | |
;; you can use a different transactor, but you cannot | |
;; use a mem db because it does not support the log API | |
(require '[datomic.api :as d]) | |
(def uri "datomic:dev://localhost:4334/reified-txes") | |
(d/delete-database uri) | |
(d/create-database uri) | |
(def conn (d/connect uri)) | |
;; define schema for all examples | |
(def schema [;; base person info | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :person/name | |
:db/valueType :db.type/string | |
:db/cardinality :db.cardinality/one | |
:db.install/_attribute :db.part/db} | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :person/email | |
:db/valueType :db.type/string | |
:db/cardinality :db.cardinality/one | |
:db.install/_attribute :db.part/db} | |
;; saga | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :person/saga | |
:db/valueType :db.type/long | |
:db/cardinality :db.cardinality/one | |
:db.install/_attribute :db.part/db} | |
;; audit | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :tx/user | |
:db/valueType :db.type/string | |
:db/cardinality :db.cardinality/one | |
:db/index true | |
:db.install/_attribute :db.part/db} | |
;; unique txid | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :tx/id | |
:db/valueType :db.type/uuid | |
:db/cardinality :db.cardinality/one | |
:db/unique :db.unique/value | |
:db/index true | |
:db.install/_attribute :db.part/db} | |
;; cfg mgmt | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :list/name | |
:db/valueType :db.type/string | |
:db/cardinality :db.cardinality/one | |
:db/unique :db.unique/value | |
:db/index true | |
:db.install/_attribute :db.part/db} | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :list/person | |
:db/valueType :db.type/ref | |
:db/cardinality :db.cardinality/many | |
:db.install/_attribute :db.part/db} | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :list/version | |
:db/valueType :db.type/long | |
:db/cardinality :db.cardinality/one | |
:db/unique :db.unique/value | |
:db/index true | |
:db.install/_attribute :db.part/db} | |
;; import | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :import/name | |
:db/valueType :db.type/string | |
:db/cardinality :db.cardinality/one | |
:db/unique :db.unique/value | |
:db/index true | |
:db.install/_attribute :db.part/db} | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :import/complete | |
:db/valueType :db.type/boolean | |
:db/cardinality :db.cardinality/one | |
:db/index true | |
:db.install/_attribute :db.part/db} | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :import/tx | |
:db/valueType :db.type/ref | |
:db/cardinality :db.cardinality/many | |
:db.install/_attribute :db.part/db} | |
;; auto compensation | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :tx/compensates | |
:db/valueType :db.type/ref | |
:db/cardinality :db.cardinality/one | |
:db.install/_attribute :db.part/db}]) | |
;; transact the schema | |
@(d/transact conn schema) | |
;; auditing | |
@(d/transact conn [{:db/id (d/tempid :db.part/user) | |
:person/name "Tim"} | |
{:db/id (d/tempid :db.part/tx) | |
:tx/user "Bob" }]) | |
@(d/transact conn [{:db/id (d/tempid :db.part/user) | |
:person/name "Gus"} | |
{:db/id (d/tempid :db.part/tx) | |
:tx/user "Bob" }]) | |
(d/q '[:find ?e ?a ?v ?tx ?op | |
:in $ ?log ?who | |
:where | |
[?tx :tx/user ?who] | |
[(tx-data ?log ?tx) [[?e ?a ?v _ ?op]]]] | |
(d/db conn) | |
(d/log conn) | |
"Bob") | |
;; ensure work was done | |
(def unique-txid #uuid "a14163db-6f66-40d5-9a3b-0f8891f8cada") | |
(def unique-tx [{:db/id (d/tempid :db.part/user) | |
:person/name "Sarah"} | |
{:db/id (d/tempid :db.part/tx) | |
:tx/id unique-txid}]) | |
@(d/transact conn unique-tx) | |
(when (empty? (d/datoms (d/db conn) :avet :tx/id unique-txid)) | |
(try | |
@(d/transact conn unique-tx) | |
(catch Throwable t | |
(let [cause (.getCause t)] | |
(when-not | |
(and (= :db.error/unique-conflict | |
(:db/error (ex-data cause))) | |
(.contains (.getMessage cause) ":tx/id")) | |
(throw t)))))) | |
;; versioned data | |
@(d/transact conn [{:db/id (d/tempid :db.part/user) | |
:list/name "Ewalds" | |
:list/version 0}]) | |
@(d/transact conn [{:db/id (d/tempid :db.part/user) | |
:person/name "Tim" | |
:list/_person [:list/name "Ewalds"]}]) | |
@(d/transact conn [{:db/id [:list/name "Ewalds"] | |
:list/version 1}]) | |
@(d/transact conn [{:db/id (d/tempid :db.part/user) | |
:person/name "Gus" | |
:list/_person [:list/name "Ewalds"]}]) | |
@(d/transact conn [{:db/id [:list/name "Ewalds"] | |
:list/version 2}]) | |
(defn db-for-version-of-list | |
[db list-name ver] | |
(let [tx (ffirst (d/q '[:find ?tx | |
:in $ ?lname ?lver | |
:where | |
[?list :list/name ?lname] | |
[?list :list/version ?lver ?tx ?added] | |
[(= ?added true)]] | |
(-> db d/history) | |
list-name | |
ver))] | |
(d/as-of db tx))) | |
(defn names-on-version-of-list | |
([db list-name] | |
(names-on-version-of-list db list-name (ffirst (d/q '[:find ?lver | |
:in $ ?lname | |
:where | |
[?l :list/name ?lname] | |
[?l :list/version ?lver]] | |
db | |
list-name)))) | |
([db list-name ver] | |
(d/q '[:find ?pname | |
:in $ ?lname | |
:where | |
[?l :list/name ?lname] | |
[?l :list/person ?p] | |
[?p :person/name ?pname]] | |
(db-for-version-of-list db list-name ver) | |
"Ewalds"))) | |
(names-on-version-of-list (d/db conn) "Ewalds") | |
;; import | |
@(d/transact conn [{:db/id (d/tempid :db.part/user) | |
:import/name "adding-names-1" | |
:import/complete false}]) | |
@(d/transact conn [{:db/id (d/tempid :db.part/user) | |
:person/name "Tim"} | |
{:db/id [:import/name "adding-names-1"] | |
:import/tx (d/tempid :db.part/tx)}]) | |
@(d/transact conn [{:db/id (d/tempid :db.part/user) | |
:person/name "Gus"} | |
{:db/id [:import/name "adding-names-1"] | |
:import/tx (d/tempid :db.part/tx)}]) | |
@(d/transact conn [{:db/id [:import/name "adding-names-1"] | |
:import/tx (d/tempid :db.part/tx) | |
:import/complete true}]) | |
(def people '[[(people ?p) | |
[?i :import/complete true] | |
[?i :import/tx ?tx] | |
[?p :person/name _ ?tx]]]) | |
(d/q '[:find (pull ?p [:db/id :person/name]) | |
:in $ % | |
:where | |
[people ?p]] | |
(d/db conn) | |
people) | |
;; automatic compensation | |
;; generate compensating tx, ignores datoms about original tx, adds reference | |
;; to original tx to new tx | |
(defn invert-tx | |
[log tx] | |
(transduce (comp (remove (fn [[e _ _ tx _]] (= e tx))) | |
(map (fn [[e a v tx added]] [(if added :db/retract :db/add) e a v]))) | |
conj | |
[[:db/add (d/tempid :db.part/tx) :tx/compensates tx]] | |
(-> (d/tx-range log tx (inc tx)) first :data))) | |
@(d/transact conn [{:db/id (d/tempid :db.part/user) | |
:person/name "Tim" | |
:person/email "[email protected]"} | |
{:db/id (d/tempid :db.part/tx) | |
:person/saga 10}]) | |
@(d/transact conn [{:db/id (d/tempid :db.part/user) | |
:person/name "Gus"} | |
{:db/id (d/tempid :db.part/tx) | |
:person/saga 10}]) | |
;; look up txes for saga 10 | |
(def saga-txes (->> (d/q '[:find ?tx :where [?tx :person/saga 10]] (d/db conn)) | |
(map first) | |
(sort >))) | |
;; generate compensating txes | |
(def compensating-txes | |
(let [log (d/log conn)] | |
(map #(invert-tx log %) saga-txes))) | |
;; process compensating txes | |
(doseq [compensating-tx compensating-txes] | |
(prn @(d/transact conn compensating-tx))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment