Last active
June 12, 2018 06:41
-
-
Save minikomi/0d2aa12cea77b3da70877f1f38119e9e to your computer and use it in GitHub Desktop.
Async processing for plumatic.plumbing/graph
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; This buffer is for Clojure experiments and evaluation. | |
;; Press C-j to evaluate the last expression. | |
(set-env! :dependencies #(into % '[[prismatic/plumbing "0.5.5"]])) | |
(require '[clojure.core.async :as async :refer [>!! alts! chan close! go-loop timeout]]) | |
(require '[plumbing.core :as p]) | |
(require '[plumbing.graph :as g]) | |
(require '[plumbing.fnk.pfnk :as pfnk]) | |
(def test-graph | |
(g/graph | |
:xs (p/fnk [] [1 2 3]) | |
:n (p/fnk [xs] (Thread/sleep 3000) (count xs)) | |
:m (p/fnk [xs n] (Thread/sleep 2000) (/ (p/sum identity xs) n)) | |
:m2 (p/fnk [xs n] (Thread/sleep 2000)(/ (p/sum #(* % %) xs) n)) | |
:v (p/fnk [m m2] (Thread/sleep 3000) (- m2 (* m m))))) | |
(defn graph->requirements [graph] | |
(into {} | |
(map #(vector (first %) (set (pfnk/input-schema-keys (second %)))) | |
graph))) | |
(graph->requirements test-graph) | |
(defn get-doable [graph results] | |
(let [reqs (graph->requirements graph) | |
result-set (set (keys results))] | |
(println "results" result-set) | |
(set | |
(for [[id req-set] reqs | |
:when (and (clojure.set/subset? req-set result-set) | |
(not (result-set id)))] | |
id)))) | |
(defn start-tasks [graph reporter-ch status] | |
(let [all-reqs (graph->requirements graph) | |
{:keys [results todo active]} @status] | |
(for [id todo | |
:let [task (graph id) | |
reqs (get all-reqs id) | |
args-map (select-keys results reqs)] | |
:when (not (active id))] | |
(do | |
(swap! status update :active conj id) | |
(future | |
(try (let [result (task args-map)] | |
(>!! reporter-ch [id result])) | |
(catch Exception e | |
(>!! reporter-ch e)))))))) | |
(defn now [] | |
(.getTime (java.util.Date.))) | |
(defn cleanup-status [jobs reporter-ch kill-ch status final-status] | |
(println final-status) | |
(doseq [j jobs] (future-cancel j)) | |
(close! reporter-ch) | |
(close! kill-ch) | |
(swap! status assoc :status final-status :time (now)) | |
(swap! status dissoc :kill :active :todo)) | |
(defn processing-pipeline [graph] | |
(let [reporter-ch (chan) | |
kill-ch (chan) | |
first-doable (get-doable graph {}) | |
status (atom | |
{:kill (fn [] (>!! kill-ch :die)) | |
:status :running | |
:time (now) | |
:active #{} | |
:todo first-doable | |
:results {}})] | |
(go-loop [jobs (start-tasks graph reporter-ch status)] | |
(if (empty? jobs) | |
(cleanup-status jobs reporter-ch kill-ch status :finished) | |
(let [timeout-ch (timeout 10000) | |
[ch-v ch] (alts! [reporter-ch kill-ch])] | |
(cond | |
(= ch timeout-ch) | |
(cleanup-status jobs reporter-ch kill-ch status :timeout) | |
(= ch kill-ch) | |
(cleanup-status jobs reporter-ch kill-ch status :killed) | |
(instance? Throwable ch-v) | |
(do | |
(cleanup-status jobs reporter-ch kill-ch status :error) | |
(swap! status assoc :error ch-v)) | |
:else | |
(do | |
(let [[k v] ch-v | |
_ (println "received:" k "-" v) | |
new-results (assoc (:results @status) k (or v :ok)) | |
doable (get-doable graph new-results)] | |
(swap! status update :active disj k) | |
(swap! status assoc | |
:todo doable | |
:results new-results) | |
(recur (into (filterv #(not (realized? %)) jobs) | |
(start-tasks graph reporter-ch status))))))))) | |
status)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment