Skip to content

Instantly share code, notes, and snippets.

@vvvvalvalval
Created April 25, 2018 09:59
Show Gist options
  • Save vvvvalvalval/6e1888995fe1a90722818eefae49beaf to your computer and use it in GitHub Desktop.
Save vvvvalvalval/6e1888995fe1a90722818eefae49beaf to your computer and use it in GitHub Desktop.
Erasing data from Datomic via a manual data migration
;; # EMULATING DATOMIC EXCISION VIA MANUAL DATA MIGRATION
;; *************************************
;; ## Introduction
;; *************************************
;; This Gist demonstrates a generic way to migrate an entire Datomic database
;; from an existing 'Origin Connection' to a clean 'Destination Connection',
;; while getting rid of some undesirable data and otherwise preserving history.
;; (In this example, the undesirable data consists of the values stored in some
;; attributes, but you can adapt the processing to your use case.)
;; It does so by processing the Log of the Origin Connection, and creating
;; a new transaction for each entry, writing it to the Destination Connection.
;; *************************************
;; ## The problem
;; *************************************
;; We need to erase data from the history, which is the intention of Datomic Excision.
;; However, Datomic Excision is not suitable in some cases (for instance, it does not erase
;; data of :db/fulltext attributes), and is not available in some deployments (Datomic Cloud).
;; *************************************
;; ## Strategy
;; *************************************
;; The difficulty in migrating data from a Datomic db to another is 'Entity Id Renumbering':
;; A given domain entity will not have the same :db/id in the Origin db and the Destination db,
;; which makes it difficult to preserve the relationships between these entities.
;; To combat this, we introduce a new :db.type/long, :db.unique/identity attribute in the Destination db,
;; which keeps track of Entity Id that each entity had in the Origin db,
;; and we rely on upsert behaviour.
;; *************************************
;; ## Usage notes
;; *************************************
;; 1. The partition of entities are preserved.
;; 2. It is safe to stop and restart this job, it will pick up where it left.
;; 3. You may want to do the bulk of the migration with dev: connections on your local machine, to
;; get better performance and cheaper resource utilization. You can then backup/restore
;; the Destination db to the production deployment of the Destination Connection,
;; migrate the last datoms, then switch your production system from the Origin to the Destination deployment
;; (will probably require a small downtime).
;; 4. If some of the blacklisted attributes have changed their :db/ident in the history of the Origin db, use their present :db/ident.
;; 5. This example is for Peers, but you can easily adapt it to Clients.
;; 6. To be resilient to errors, the transactions are NOT pipelined.
;; *************************************
;; ## Application-specific params
;; *************************************
;; Adapt these to your needs:
(def migr-attribute-ident
"The ident of a new attribute which will be added to dest-conn
to keep track of the entity id of an entity in origin-conn."
:rewrite-db/v0-entid)
(def migr-attribute-doc
"The :db/id that this entity had in the v0 version of the database.")
(def blacklisted-attrs-idents
"The idents of some attributes which will not be used in the destination conn.
In the destination db, these attributes will be installed, but no datom having them in the attribute position
will be present."
;; in this example, the goal is to get rid of the :db/fulltext nature of any attribute,
;; because it prevents their data from being excised.
#{:db/fulltext})
;; *************************************
;; ## Usage
;; *************************************
;; Load this entire file in your REPL,
;; then manually execute the code in the (comment ... ) block.
;; You will need the Datomic Peer Library And clojure/core.async on the classpath.
(require '[datomic.api :as d])
(require '[clojure.core.async :as a])
(import '(java.util TreeMap NavigableMap Map Date))
(declare
;; Those will be implemented below
prepare-dest-conn!
migrate-data!)
(comment
(def orgn-conn (d/connect "... FIXME"))
(def dest-conn-uri "datomic:... FIXME")
;; creating and preparing the Destination Connection - you only need to do this once
;; ----------------------
(d/create-database dest-conn-uri)
(prepare-dest-conn!
orgn-conn
(d/connect dest-conn-uri))
;; ----------------------
(def dest-conn (d/connect dest-conn-uri))
;; migrating the data - returns immediately, work happens in another thread
(def stop!
(migrate-data! orgn-conn dest-conn))
;; Optional: if you want to interrupt the processing, invoke the returned function:
(stop!)
)
;(require 'sc.api) ;; Optional: use scope-capture for debugging - see: https://github.com/vvvvalvalval/scope-capture
;; *************************************
;; ## Implementation
;; *************************************
(defn prepare-dest-conn!
"Prepares the Destination Connection, by installing the schema and data
supporting the use of the migration attribute,
before the first :db/txInstant of the Origin Connection."
[orgn-conn dest-conn]
(when-not (empty? (d/tx-range (d/log dest-conn) nil nil))
(throw (ex-info "dest-conn should be empty." {})))
(let [t0 (-> (d/log orgn-conn)
(d/tx-range nil nil)
seq
(or (throw (ex-info
"orgn-conn has an empty Log"
{})))
first
:t)
^Date first-tx-instant
(-> (d/entity (d/db orgn-conn) (d/t->tx t0))
:db/txInstant)
dest-db (d/db dest-conn)
tx-install-attr
[[:db/add (d/tempid :db.part/tx) :db/txInstant
(Date. (long (-> first-tx-instant .getTime (- 2000))))]
{:db/cardinality :db.cardinality/one,
:db/index true,
:db.install/_attribute :db.part/db,
:db/id (d/tempid :db.part/db),
:db/ident migr-attribute-ident,
:db/valueType :db.type/long,
:db/doc migr-attribute-doc,
:db/unique :db.unique/identity}]
tx-install-start-idents-ids
(->>
(d/q '[:find ?orgn-e ?dest-e ?ident :in $orgn $dest :where
[$dest ?dest-e :db/ident ?ident]
[$orgn ?orgn-e :db/ident ?ident]]
(d/as-of (d/db orgn-conn) (dec t0))
dest-db)
(map (fn [[orgn-e dest-e]]
[:db/add dest-e migr-attribute-ident orgn-e]))
(into [[:db/add (d/tempid :db.part/tx) :db/txInstant
(Date. (long (-> first-tx-instant .getTime (- 1000))))]]))]
[@(d/transact-async dest-conn tx-install-attr)
@(d/transact-async dest-conn tx-install-start-idents-ids)]
))
(defn- find-last-migrated-t
"Finds the last Origin t which has been migrated to dest-conn,
or nil if no Log entry has been migrated to dest-conn."
[dest-conn]
(let [new-db (d/db dest-conn)
last-t (d/basis-t new-db)]
(when-let [old-tx-eid
(migr-attribute-ident (d/entity new-db (d/t->tx last-t)))]
(d/tx->t old-tx-eid))))
(defn- ident-finder
"Given a database value, returns a function which can quickly compute the ident of
an entity at any point in time."
[orng-db]
(let [eid->t->ident
(->>
(d/q '[:find ?e ?ident ?tx ?added :in $ :where
[?e :db/ident ?ident ?tx ?added]]
(d/history orng-db))
(group-by (fn [[e _ident _tx _added]] e))
(into {}
(map (fn [[e tuples]]
[e
(TreeMap.
^Map
(->> tuples
(sort-by (fn [[_p _ident tx added]] [tx added]))
(group-by (fn [[_p _ident tx _added]] (d/tx->t tx)))
(into {}
(map (fn [[t tuples]]
(case (count tuples)
1
(let [[_ ident _ added] (first tuples)]
(if added
[t ident]
[t nil]))
2
(let [[_ ident _ _] (last tuples)]
[t ident])))))))]))))]
(fn find-ident-at-t
([eid t]
(when-some [^NavigableMap t->ident (get eid->t->ident eid)]
(when-some [e (.floorEntry t->ident t)]
(.getValue e)))))))
(defn- safe-onto-chan
"A blocking version of onto-chan that doesn't have memory leaks."
([ch coll]
(safe-onto-chan ch coll true))
([ch coll close?]
(a/thread
(loop [vs (seq coll)]
(if (and vs (a/>!! ch (first vs)))
(recur (next vs))
(when close?
(a/close! ch)))))))
(defn migrate-data!
"Migrates data from orgn-conn to dest-conn, while removing some datoms and preserving history.
Returns immediately, returning a 0-arity 'stop!' function which may be called to interrupt the processing.
An optional, 0-arity callback function may be supplied, which will be called when the processing is done
(either because the whole log has been consumed, or because 'stop!' has been called, or because an error occured.
Processes the Log of orgn-conn, creating a transaction for each log entry, and transacting it to dest-conn,
while omitting datoms that have one of the blacklisted attributes in attribute position."
([orgn-conn dest-conn]
(migrate-data! orgn-conn dest-conn (constantly nil)))
([orgn-conn dest-conn on-done]
(let [orgn-db (d/db orgn-conn)
find-ident-at-t (ident-finder orgn-db)
orgn-ref-attrs
(set
(d/q '[:find [?a ...] :where
[?a :db/valueType :db.type/ref _ true]]
(d/history orgn-db)))
blacklisted-attr-eids
(into #{}
(map (fn [ident]
(d/entid orgn-db ident)))
blacklisted-attrs-idents)
=i+log-entrys=
(a/chan 64 (map-indexed vector))]
(safe-onto-chan
=i+log-entrys=
(d/tx-range
(d/log orgn-conn)
(when-some [last-t (find-last-migrated-t dest-conn)]
(inc last-t))
nil))
(a/thread
(time
(loop []
(if-some [i+le (a/<!! =i+log-entrys=)]
(let [[i log-entry] i+le]
(let [t (:t log-entry)]
(when (-> i (mod 10000) (= 0))
(printf "%tT Processed %d txes, now at t = %d \n"
(Date.) i t)))
(let [t (:t log-entry)
tx
(try
(->> log-entry :data
(remove
(fn [[_e a _v _tx _added?]]
(contains? blacklisted-attr-eids a)))
(map
(fn [[e a v _tx added?]]
(let [new-a
[migr-attribute-ident a]]
(if added?
{:db/id (d/tempid (find-ident-at-t (d/part e) t))
migr-attribute-ident e
new-a
(if (orgn-ref-attrs a)
{:db/id (d/tempid (find-ident-at-t (d/part v) t))
migr-attribute-ident v}
v)}
[:db/retract
[migr-attribute-ident e]
new-a
(if (orgn-ref-attrs a)
[migr-attribute-ident v]
v)]))))
vec)
(catch Throwable err
(let [new-db (d/db dest-conn)
old-as-of
(d/as-of orgn-db (:t log-entry))]
;(sc.api/spy t) ;; Uncomment to be able to easily reproduce the context of errors using scope-capture.
(a/close! =i+log-entrys=)
(throw
(ex-info
(str "Error when building tx number " i ", aborted processing.")
{:i i :t t
:log-entry log-entry}
err)))))]
(try
@(d/transact-async dest-conn tx)
(catch Throwable err
(let [new-db (d/db dest-conn)
old-as-of
(d/as-of orgn-db (:t log-entry))]
;(sc.api/spy t) ;; Uncomment to be able to easily reproduce the context of errors using scope-capture.
(a/close! =i+log-entrys=)
(throw
(ex-info
(str "Error when transacting tx number " i ", aborted processing.")
{:tx tx
:i i :t t
:log-entry log-entry}
err))))))
(recur))
(do
(printf "%tT Done migrating, now at Origin's t = %d \n"
(Date.)
(find-last-migrated-t dest-conn))
(on-done))))))
(fn stop! []
(a/close! =i+log-entrys=)))))
@vvvvalvalval
Copy link
Author

vvvvalvalval commented Apr 25, 2018

Advantages / disadvantages of this approach compared to Excision:

Advantages

  1. Processing / indexing is done offline, won't impede online transaction processing.
  2. More generally applicable than Excision (which does not erase fulltext indexes, and at the time of writing is not available on Datomic Cloud), can also be used to 'change' history without deleting it.

Disadvantages

  1. Much, much slower than regular Excision - can take hours or even days to catch up.
  2. Requires a (small) downtime

On the whole, Excision should be preferred in all situations where its limitations are not prohibitive. This approach should be used even more exceptionally than Excision.

As Robert Stuttaford well put it:

[this approach is] similar to replacing an engine in a car, rather than removing a tiny piece while it’s driving.

@vvvvalvalval
Copy link
Author

vvvvalvalval commented Apr 25, 2018

Note that pipelining could theoretically be used to speed things up (as long as you're confident there won't be errors), but I could not get it to work. Unfortunately, I suspect this may be a Datomic concurrency bug (tried on 0.9.5407), but have not worked yet through a minimal repro.

@Hendekagon
Copy link

thank-you for exploring this approach

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment