Created
October 30, 2012 06:27
-
-
Save eslick/3978622 to your computer and use it in GitHub Desktop.
A simple election mechanism for Datomic peers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; This is a relatively simple election mechanism for a set of | |
;; processes or datomic peers where a processing step requires a | |
;; global master. The elector entity is used to connect a set of peer | |
;; entities labeled as :election/master. Using the algorithm, | |
;; there will only ever be one :election/master in the :election/peer | |
;; group defined on the elector. | |
;; | |
;; The user API is simple a guarded transaction which calls a txn-fn | |
;; if the entity is the current master. If not it holds an election | |
;; if the timeout has expired. | |
;; | |
;; Each peer calls reset-elector! on startup to ensure it is on the | |
;; peer election list. Peers should have identical durations. Peers | |
;; can immediately start running guarded transactions. If new nodes | |
;; come online and reset the election, it simply delays the time until | |
;; the work completes, each unit of work will complete exactly once if | |
;; the query to grab the work drives a consistent transaction. | |
;; | |
;; The timeout is used for fail over. If I win an election, then die, | |
;; my txn-fn will either not be called or the results will not be | |
;; committed. If txn-fn is purely functional against a dbval, if the | |
;; master dies, a new master will be elected shortly, run the same query | |
;; and pick up work not completed by the now dead master. | |
;; | |
;; This assuming tasks run guarded transactions on intervals of approximately | |
;; size S consistently across the peer group and the master election duration is | |
;; an intervals of size P where P >> S. | |
;; | |
;; Max time to first commit is: S * peers | |
;; Max time to fail over is: P + S | |
;; | |
;; This algorithm allows for unbalanced work distribution if all peers run | |
;; on a time interval of strictly S and S is the right multiple of P (the same | |
;; master might always win). | |
(defn- get-elector [db name] | |
(data/find-entity db :election/name name)) | |
(defn- expired-election? [elector] | |
(< (.getTime (:election/ts elector)) | |
(.getTime (java.util.Date.)))) | |
(defn- elect! | |
"Using elector named 'name', try to elect 'entity' as master, | |
return true if provided peer won" | |
[dbpeer name entity] | |
(p/commit! dbpeer | |
[[:user.fn/electForDuration name (:db/id entity)]])) | |
(defn- master? | |
"Is the entity a master?" | |
[entity] | |
(:election/master? entity)) | |
(defn- true-master? | |
"Formally, entity is a master under election named 'name' if elector | |
ts > now and (= (:election/master entity) true)" | |
[dbval name entity] | |
(d/invoke :user.fn/electionMaster? name (:db/id entity))) | |
(defn- assert-master-txn | |
"Add a transaction statement that ensures the entity is a master | |
under election name during the transaction commit" | |
[name entity] | |
[[:user.fn/assertElectionMaster name entity]]) | |
(defn- elected? [dbpeer name entity] | |
{:pre [(data/entity? entity) (keyword? name)]} | |
(or (and (expired-election? (get-elector (p/get-db dbpeer) name)) | |
(elect! dbpeer name entity)) | |
(master? entity))) | |
;; | |
;; API: Create an elector w/ duration and peer set | |
;; | |
(defn reset-elector! | |
"Reset the state of the named elector to the beginning of duration | |
and return a fresh entity representing the elected master" | |
[dbpeer name seconds entity] | |
{:pre [(every? data/entity? entities) (number? seconds) (keyword? name)]} | |
(let [eid (data/tempid)] | |
(p/commit! | |
dbpeer | |
[{:db/id eid | |
:election/name name | |
:election/ts (java.util.Date.) | |
:election/duration (* 1000 seconds)} | |
[:db/add eid :election/peer (:db/id entity)]]) | |
(let [report (elect! dbpeer name (first entities))] | |
(data/query-entity '[:find ?eid :in $ ?name :when | |
[?elector :election/name ?name] | |
[?elector :election/peer ?eid] | |
[?eid :election/master true]] | |
(:db-after report) name)))) | |
(defn guarded-commit! | |
"Only calls txn-fn on args if the provided entity is | |
the elected master according to name, either now or | |
because it wins a new election. Call txn-fn and commit | |
with a guard to ensure consistency under race conditions. | |
Timeouts are intended to be human time (many seconds to | |
minutes, not ms)." | |
[dbpeer [name entity] txn-fn & args] | |
(when (elected? dbpeer name entity) | |
(p/commit! dbpeer | |
(vec | |
(concat | |
(assert-master-txn name entity) | |
(apply txn-fn args)))))) | |
(defn speculative-commit! | |
"At the expense of extra calls to txn-fn by peers, you | |
can simply make a guarded commit if you are the current | |
master. Useful if election step happens elsewhere. | |
txn-fn returns a valid transaction" | |
[dbpeer [name entity] txn-fn & args] | |
(when (master? entity) | |
(p/commit! dbpeer | |
(vec | |
(concat | |
(assert-master-txn name entity) | |
(apply txn-fn args)))))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(def shorthand-attributes | |
;; Election attributes | |
[[:election/name :keyword :unique-identity] | |
[:election/ts :instant :noHistory] | |
[:election/duration :long :noHistory] | |
[:election/peer :ref :noHistory] | |
[:election/master? :boolean :noHistory]]) | |
(def ^:private data-fns | |
[{:db/id (d/tempid :part/app) | |
:db/ident :user.fn/add-new | |
:db/doc "Like :db/add but has no effect if an identical assertion | |
is already present in the database." | |
:db/fn (d/function | |
'{:lang :clojure | |
:params [db e a v] | |
:code (when (empty? (q '[:find ?e :in $ ?e ?a ?v | |
:where [?e ?a ?v]] | |
db e a v)) | |
[[:db/add e a v]])})} | |
{:db/id (d/tempid :part/app) | |
:db/ident :user.fn/electForDuration | |
:db/doc "Supports a trivial election algorithm which periodically | |
and atomically chooses the first master to run the election | |
after a timeout occurs. See infra/election.clj" | |
:db/fn (d/function | |
'{:lang :clojure | |
:params [db name meid] | |
:code (let [[eid inst duration] | |
(first | |
(q '[:find ?elector ?inst ?dur :in $ ?name ?me :where | |
[?elector :election/name ?name] | |
[?elector :election/duration ?dur] | |
[?elector :election/ts ?inst]] | |
db name)) | |
elector (datomic.api/entity eid) | |
ts (.getTime inst) | |
now (.getTime (java.util.Date.))] | |
(if (< ts now) ;; winner? | |
(vec | |
(concat | |
[[:db/add eid :election/ts | |
(java.util.Date. (+ now duration))]] | |
(mapv (fn [peer] | |
(let [id (:db/id peer)] | |
[:db/add-new id :election/master? | |
(boolean (= ie meid))])) | |
(:election/peer elector)))) | |
[]))})} | |
{:db/id (d/tempid :part/app) | |
:db/ident :user.fn/electionMaster? | |
:db/doc "Non-txn function that tests whether eid is a master | |
according to elector 'name' for the current value of the db" | |
:db/fn (d/function | |
'{:lang :clojure | |
:params [db name eid] | |
:code (boolean | |
(ffirst | |
(q '[:find ?master :in $ ?name ?eid :where | |
[?elector :election/name ?name] | |
[?elector :election/ts ?ts] | |
[(< (.getTime ?ts) (.getTime (java.lang.Date.)))] | |
[?elector :election/peer ?eid] | |
[?eid :election/master? ?master]]) | |
db name eid))})} | |
{:db/id (d/tempid :part/app) | |
:db/ident :user.fn/assertElectionMaster | |
:db/doc "The provided entity and attribute must = value at commit | |
time for this to be a valid transaction, otherwise abort" | |
:db/fn (d/function | |
'{:lang :clojure | |
:params [db name eid] | |
:code (if (datomic.api/invoke :user.fn/electionMaster name eid) | |
[] | |
(ex-info (str "Assertion failed for [" eid " " attr " " db_value "] != " value)))})}) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment