Created
November 21, 2012 02:05
-
-
Save eslick/4122604 to your computer and use it in GitHub Desktop.
Simple reservation for multiple worker peers using Datomic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns experiment.infra.election | |
(:use [datomic.api :only [q] :as d]) | |
(:require [experiment.system :as sys] | |
[experiment.infra.data :as data] | |
[experiment.infra.protocols :as p])) | |
;; | |
;; Reserve - Simple peer coordination mechanism | |
;; | |
;; Reservations support multiple peers trying to distribution operations | |
;; across a cluster. As peers may go down or have problems, we don't | |
;; want to lose activity so we support reservation timeouts. Datomic | |
;; makes it trivial to perform atomic state transitions so we can simply | |
;; perform a synchronous transaction to checkout a group of objects. | |
;; It's also easy to do an asynch transaction and use a tx-queue watcher | |
;; to process the results, but this code doesn't support that. | |
;; | |
;; Peers should share identical query and timeout conventions to properly | |
;; coordinate. | |
;; | |
;; This code assumes :reserve/state, :keyword and :reserve/ts, :instant are | |
;; part of the Datomic schema and should probably have :nohistory attributes | |
;; set if reservations are high volume (and forensics aren't important or | |
;; done elsewhere). | |
;; | |
;; State ':ready' - The entity is available to be reserved if it matches | |
;; the shared query. | |
;; | |
;; State ':reserved' - The entity is reserved and being worked on by a | |
;; single peer (you know who you are). At the end of the work, the state | |
;; should be returned to :ready or to any other state to avoid timeouts. | |
;; On the other hand, timeouts are a cheap way to do roughly periodic tasks. | |
;; | |
;; State 'any' - The reservation system ignores all other states. | |
;; | |
;; (reserve <peer> <N> <entities> <timeout>) -- Given the candidate list | |
;; in entities, try to reserve all or N of them. | |
;; | |
;; (release-reservation <peer> <entities>) -- Release the reservation | |
;; state on the entities, returning to :ready by default or to | |
;; | |
;; (enable-reservations <peer> <entities>) -- Initial entities to :ready | |
;; | |
;; Functions consume entities or eids and return eids for downstream | |
;; flexibility. | |
;; | |
(defn- as-eid [entity] | |
(if (number? entity) | |
entity | |
(:db/id entity))) | |
(def ^:private reservation-schema | |
[;; Maintain reservation state, no history | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :reserve/state | |
:db/valueType :db.type/keyword | |
:db/cardinality :db.cardinality/one | |
:db/noHistory true | |
:db.install/_attribute :db.part/db} | |
;; Maintain the last reservation time for timeouts | |
{:db/id (d/tempid :db.part/db) | |
:db/ident :reserve/ts | |
:db/valueType :db.type/instant | |
:db/cardinality :db.cardinality/one | |
:db/noHistory true | |
:db.install/_attribute :db.part/db} | |
;; Function to atomically reserve N objects, including timed out ones | |
{:db/id (d/tempid :db.part/app) | |
:db/ident :user.fn/reserve | |
:db/doc | |
" | |
(reserve <db> <N> <entities> <ts> <timeout>) -- | |
Uses :reserve/state and :reserve/ts attributes to implement an atomic grab of | |
a set of candidate entity ids. Callers can use the :db-before and :db-after to | |
detect the set of entities that transitioned in that transaction. When done | |
with the entities, simply assert the :ready state on :reserve/state or allow | |
it to timeout naturally. | |
" | |
:db/fn (d/function | |
'{:lang :clojure | |
:params [db n entities ts timeout] | |
:code (->> (q '[:find ?eid | |
:in $ % [?eid ...] ?ts ?timeout | |
:where | |
(ready? ?eid ?ts ?timeout)] | |
db entities ts timeout | |
'[[(ready? ?object ?now ?timeout) | |
[?object :reserve/state :ready]] | |
[(ready? ?object ?now ?timeout) | |
[?object :reserve/state :reserved] | |
[?object :reserve/ts ?ts] | |
[(.getTime ?now) ?nowt] | |
[(.getTime ?ts) ?tst] | |
[(- ?nowt ?tst) ?diff] | |
[(> ?diff ?timeout)]]]) | |
(map first) | |
(take n) | |
(mapcat (fn [eid] | |
[[:db/add eid :reserve/state :reserved] | |
[:db/add eid :reserve/ts ts]])))})}]) | |
(defn- get-reserve-fn [dbval] | |
(first (q '[:find ?fn :where [?fn :db/ident :user.fn/reserve]] dbval))) | |
(defn ensure-reserve-fn [peer] | |
(when (empty? (get-reserve-fn (p/get-db peer))) | |
(p/commit! peer reservation-fns))) | |
(defn- reserved-entities | |
"Returns the subset of eids that were transitioned to the :reserved | |
state with the provided 'now' as the reference timestamp for computing timeout" | |
[report] | |
(q '[:find ?eid :in $ [[?eid ?a ?v ?tx] ...] :where | |
[?eid :reserve/state :reserved]] | |
(:db-after report) (:tx-data report))) | |
(defn reserve | |
"Reserve a set of entity ids by transitioning the winners to :reserve and | |
return the list of eids representing the matched set." | |
([peer entities timeout] | |
(reserve peer (count entities) entities timeout)) | |
([peer N entities timeout] | |
{:pre [(or (nil? N) (number? N)) | |
(or (nil? timeout) (and (number? timeout) (> timeout 0)))]} | |
(if (and N (> N 0) (> (count entities) 0)) | |
(reserved-entities | |
(p/commit! peer | |
[[:user.fn/reserve N (map as-eid entities) (java.util.Date.) timeout]])) | |
[]))) | |
(defn release-reservation-tx | |
"Reset :reserve/state to :ready and ts to provided timestamp" | |
([entity] | |
(release-reservation-tx (java.util.Date.) entity)) | |
([ts entity] | |
(let [eid (as-eid entity)] | |
[[:db/add eid :reserve/state :ready] | |
[:db/add eid :reserve/ts ts]]))) | |
(defn release-reservation | |
"Reset a grabbable entity for a future epoch by setting | |
state to ready and tx to now" | |
([peer entities] | |
(if (sequential? entities) | |
(p/commit! peer (mapcat release-reservation-tx entities)) | |
(p/commit! peer (release-reservation-tx entities))))) | |
(defn enable-reservations | |
"Ensure provided entities are ready for " | |
[peer entities] | |
(if (sequential? entities) | |
(p/commit! peer (mapcat release-reservation-tx entities)) | |
(p/commit! peer (release-reservation-tx entities)))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Test Reservation Mechanism | |
(defn create-object [peer name] | |
(p/commit! peer [{:db/id (data/tempid) | |
:model/id (str name) | |
:model/type :test | |
:reserve/state :ready}])) | |
(deftest simple-reservation | |
(with-temporary-peer [peer] | |
(schema/ensure-schema (d/connect (:uri peer)) (p/get-db peer)) | |
(e/ensure-reserve-fn peer) | |
(doall (map (partial create-object peer) (range 10))) | |
(let [candidates (data/find-entities (p/get-db peer) :model/type :test) | |
reserved1 (e/reserve peer 5 candidates 2000)] | |
(is (= (count candidates) 10)) | |
(is (= (count reserved1) 5)) | |
;; Reserve a second set | |
(let [reserved2 (e/reserve peer 10 candidates 2000)] | |
;; Not the same as the first! | |
(is (= (count (clojure.set/intersection (set reserved1) (set reserved2))) 0)) | |
(is (= (count reserved2) 5)) | |
(is (= (count (e/reserve peer 10 candidates 2000)) 0))) | |
;; Releasing and re-reserve the set | |
(e/release-reservation peer candidates) | |
(is (= (count (e/reserve peer 20 candidates 2000)) 10)) | |
;; Then let them timeout | |
(java.lang.Thread/sleep 200) | |
(is (= (count (e/reserve peer 10 candidates 100)) 10))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment