Created
September 8, 2014 05:01
-
-
Save zcaudate/a7eb0a4faf3138365607 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns hara.concurrent.workflow | |
(:require [hara.common.checks :refer [hash-map? promise?]] | |
[hara.common.primitives :refer [uuid]] | |
[hara.data.nested :refer [merge-nil-nested]] | |
[clojure.set :as set])) | |
(defn create-registry [] | |
(atom {:tickets {} | |
:tasks {} | |
:active #{} | |
:running {} | |
:upstream {} | |
:downstream {}})) | |
(def ^:dynamic *registry* (create-registry)) | |
(defn task? [p] | |
(and (promise? p) | |
(-> p meta :type (= :task)))) | |
(defn get-meta [p k] | |
(-> p meta (get k))) | |
(defn task-dependencies [args reg] | |
(->> args | |
(filter #(and (task? %) | |
(not (realized? %)) | |
(get (:tasks reg) (get-meta % :id)))) | |
(map #(get-meta % :id)) | |
set)) | |
(defn link-upstream [reg id upstream] | |
(reduce (fn [reg upid] | |
(update-in reg [:downstream upid] conj id)) | |
reg | |
upstream)) | |
(defn unlink-upstream [reg id upstream] | |
(reduce (fn [reg upid] | |
(update-in reg [:downstream upid] disj id)) | |
reg | |
upstream)) | |
(defn unlink-downstream [reg id downstream] | |
(reduce (fn [reg dnid] | |
(update-in reg [:upstream dnid] disj id)) | |
reg | |
downstream)) | |
(defn register-task [registry id entry args] | |
(swap! registry | |
(fn [reg] | |
(let [upstream (task-dependencies args reg)] | |
(-> reg | |
(link-upstream id upstream) | |
(assoc-in [:tasks id] entry) | |
(assoc-in [:upstream id] upstream) | |
(assoc-in [:downstream id] #{})))))) | |
(defn task [opts? f & args] | |
(let [[opts f args] (if (hash-map? opts?) | |
[opts? f args] | |
[{} opts? (cons f args)]) | |
id (or (:id opts) (uuid)) | |
opts (-> (merge-nil-nested opts {:registry *registry*}) | |
(assoc :id id :type :task)) | |
p (vary-meta (promise) merge opts) | |
entry {:id id :promise p :function f :args args}] | |
(register-task (-> opts :registry) id entry args) | |
p)) | |
(defn either [opts? task & more] | |
(let [[opts tasks] (if (hash-map? opts?) | |
[opts? (cons task more)] | |
[{} (cons opts? (cons task more))]) | |
id (or (:id opts) (uuid)) | |
opts (-> (merge-nil-nested opts {:registry *registry*}) | |
(assoc :id id :type :either)) | |
p (vary-meta (promise) merge opts) | |
entry {:id id :promise p :choices tasks}] | |
(register-task (-> opts :registry) id entry tasks) | |
p)) | |
(defn take-ticket [tickets id] | |
(if (get tickets id) | |
(throw (Exception. (str "Task " id " already has a ticket"))) | |
(assoc tickets id (promise)))) | |
(defn return-ticket [tickets id p registry] | |
(swap! registry | |
(fn [reg] | |
(let [_ @(get tickets id)] | |
(-> reg | |
(unlink-downstream id (get-in reg [:downstream id])) | |
(update-in [:tickets] dissoc id) | |
(update-in [:tasks] dissoc id) | |
(update-in [:running] dissoc id) | |
(update-in [:active] disj id)))))) | |
(defn perform-apply [p function args opts] | |
(let [args (if (-> opts :apply-to (= :promise)) | |
args | |
(map (fn [x] (if (task? x) @x x)) args))] | |
(apply function args))) | |
(declare trigger-downstream) | |
(defn perform-task [p registry id reg opts] | |
(if-let [entry (get-in reg [:tasks id])] | |
(let [tickets (take-ticket (:tickets reg) id) | |
thrd (-> (future | |
(try | |
(deliver p (perform-apply p (:function entry) (:args entry) (meta p))) | |
(finally (return-ticket tickets id p registry) | |
(trigger-downstream registry id)))) | |
(vary-meta merge opts)) | |
_ (deliver (get tickets id) thrd)] | |
(-> reg | |
(assoc :tickets tickets) | |
(assoc-in [:running id] thrd) | |
(assoc-in [:tasks id :running] true))) | |
reg)) | |
(defn all-upstream | |
([p] (let [opts (meta p)] | |
(all-upstream (:id opts) @(:registry opts)))) | |
([id reg] (all-upstream id reg #{})) | |
([id reg output] | |
(let [upstream (get-in reg [:upstream id])] | |
(apply set/union | |
output | |
upstream | |
(map #(all-upstream % reg) upstream))))) | |
(defn all-downstream | |
([p] (let [opts (meta p)] | |
(all-downstream (:id opts) @(:registry opts)))) | |
([id reg] (all-downstream id reg #{})) | |
([id reg output] | |
(let [downstream (get-in reg [:downstream id])] | |
(apply set/union | |
output | |
downstream | |
(map #(all-downstream % reg) downstream))))) | |
(defn activate-task | |
([p] | |
(let [opts (meta p)] | |
(activate-task (:id opts) (:registry opts)))) | |
([id registry] | |
(swap! registry | |
(fn [reg] | |
(let [ids (conj (all-upstream id reg) id)] | |
(update-in reg [:active] set/union ids)))))) | |
(defn perform | |
([p] | |
(if (not (realized? p)) | |
(let [{:keys [id registry]} (meta p)] | |
(activate-task p) | |
(doseq [id (:active @registry)] | |
(perform registry id)))) | |
p) | |
([registry id] (perform registry id (get-in @registry [:tasks id :promise]))) | |
([registry id p] | |
(swap! registry | |
(fn [reg] | |
(if (get-in reg [:running id]) | |
reg | |
(let [upstream (get-in reg [:upstream id]) | |
reg (update-in reg [:upstream] dissoc id)] | |
(cond (and (empty? upstream) | |
(get-in reg [:active id])) | |
(perform-task p registry id reg (meta p)) | |
:else reg))))))) | |
(defn trigger-downstream-cleanup [reg id] | |
(let [downstream (get-in reg [:downstream id]) | |
reg (update-in reg [:downstream] dissoc id)] | |
(-> (reduce (fn [reg dnid] | |
(let [dnp (get-in reg [:tasks dnid :promise])] | |
(condp = (-> dnp meta :type) | |
:task reg | |
:either (let [upp (get-in reg [:tasks id :promise]) | |
upstream (get-in reg [:upstream dnid]) | |
dndownstream (get-in reg [:downstream dnid])] | |
(println "promise" id upp) | |
(deliver dnp @upp) | |
(-> reg | |
(unlink-upstream dnid upstream) | |
(trigger-downstream-cleanup reg dnid dndownstream)))))) | |
reg | |
downstream) | |
(update-in [:triggers] set/union downstream)))) | |
(defn trigger-downstream [registry id] | |
(swap! registry | |
(fn [reg] | |
(trigger-downstream-cleanup reg id))) | |
(swap! registry | |
(fn [reg] | |
(doseq [id (:triggers reg)] | |
(perform registry id)) | |
(dissoc reg :triggers)))) | |
(defn upstream | |
([p] | |
(let [opts (meta p)] | |
(upstream (:id opts) @(:registry opts)))) | |
([id reg] | |
(get-in reg [:upstream id]))) | |
(defn downstream | |
([p] | |
(let [opts (meta p)] | |
(downstream (:id opts) @(:registry opts)))) | |
([id reg] | |
(get-in reg [:downstream id]))) | |
(comment) | |
(do | |
(def a (task {:id :a} | |
#(do (Thread/sleep 100) | |
(inc %)) 1)) | |
(def b (task {:id :b} | |
#(do (Thread/sleep 100) | |
(inc %)) a)) | |
(def c (task {:id :c} | |
#(do (Thread/sleep 100) | |
(inc %)) b)) | |
(def d (either {:id :d} b c)) | |
*registry* | |
;;(meta b) | |
(perform d) | |
;;(activat) | |
;;(perform a) | |
;;(perform d) | |
;;(perform b) | |
;;b | |
;;c | |
;; | |
;;(:active @*registry*) | |
;; (all-upstream c) | |
;;@a | |
;;(perform a) | |
;;(perform b) | |
;;(upstream b) | |
;; (downstream b) | |
;;(>pst) | |
;;(:upstream @*registry*) | |
;; (:downstream @*registry*) | |
;; | |
;; (task? (task inc 1)) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment