Skip to content

Instantly share code, notes, and snippets.

@joinr
Last active January 10, 2020 14:33
Show Gist options
  • Save joinr/d69476846be0b05cb891e93fbe5f2a8b to your computer and use it in GitHub Desktop.
Save joinr/d69476846be0b05cb891e93fbe5f2a8b to your computer and use it in GitHub Desktop.
Exploration for walking dags in parallel
(ns dagdemo
(:require [spork.cljgraph.core :as g]
[clojure.core.async :as async]))
;; A E
;; / \ |
;; B C <- F
;; \ / |
;; D G
(def the-dag
(->> [[:a :b]
[:a :c]
[:b :d]
[:c :d]
[:e :f]
[:f :c]
[:f :g]]
g/arcs->graph))
(defn pmap!
([n f xs]
(let [output-chan (async/chan)]
(async/pipeline-blocking n
output-chan
(map f)
(async/to-chan xs))
(async/<!! (async/into [] output-chan))))
([f xs] (pmap! (.availableProcessors (Runtime/getRuntime)) f xs)))
(defn now [] (System/currentTimeMillis))
(defn process [nd]
(let [start (now)]
(println [:processing! nd start])
(Thread/sleep (* (rand-int 10) 100))
[:processed! nd start :ended (now)]))
(defn pwalk [f g]
(apply concat
(for [nodes (g/topsort g)]
(pmap! 2 process nodes))))
;;alt implementation
(defn work [f nd graph visited]
(if-not (visited nd)
{:value (f nd)
:pending (g/sinks graph nd)}
:none))
(defn par-walk [n f graph]
(loop [fringe (g/get-roots graph)
visited #{}
acc []]
(if (seq fringe)
(let [res (filter map?
(pmap! 2 (fn [nd]
(work f nd graph visited)) fringe))
new-fringe (distinct (mapcat :pending res))
new-visited (into visited fringe)]
(recur new-fringe new-visited (into acc (map :value res))))
acc)))
(defn timed-work [nd]
(let [start (now)
_ (Thread/sleep (* (rand-int 10) 100))]
{:node nd :start start :end (now)}))
;; dagdemo> (par-walk 2 timed-work the-dag)
;; [{:node :a, :start 1578585697327, :end 1578585697842}
;; {:node :e, :start 1578585697327, :end 1578585697530}
;; {:node :b, :start 1578585697842, :end 1578585698654}
;; {:node :c, :start 1578585697842, :end 1578585698248}
;; {:node :f, :start 1578585698248, :end 1578585698248}
;; {:node :d, :start 1578585698654, :end 1578585699357}
;; {:node :g, :start 1578585698654, :end 1578585698857}]
(defn close-all! [& xs]
(doseq [x xs] (async/close! x)))
;;this version fails to enforce precedence constraints!
(defn async-walk [n f graph]
(let [visited (atom #{})
open (atom #{})
fringe (async/chan 100)
pending (async/chan n (distinct))
results (async/chan 100)
shutdown (fn [] (close-all! pending fringe results))
_ (add-watch open :empty (fn [_ _ old new]
(if (empty? new)
(shutdown))))
push-fringe (fn [xs]
(let [new (filter (fn [x] (not (@visited x))) xs)]
(swap! open #(into % new))
(async/onto-chan fringe new false)))
_ (push-fringe (g/get-roots graph))
work (fn [nd]
(let [res (work f nd graph @visited)]
(when (not= res :none)
(async/>!! results (:value res))
(push-fringe (:pending res)))
(swap! visited conj nd)
(swap! open disj nd)))
workers (fn [] (dotimes [i n]
(async/thread
(when-let [nd (async/<!! pending)]
(when-not (@visited nd) (work nd))
(recur)))))]
(async/go-loop []
(when-let [nd (async/<! fringe)]
(when-not (@visited nd) (async/>! pending nd))
(recur)))
(workers)
(async/<!! (async/into [] results))))
;;e goes faster, its children get on the fringe first...
;; dagdemo> (async-walk 2 timed-work the-dag)
;; [{:node :e, :start 1578592447969, :end 1578592448172}
;; {:node :a, :start 1578592447969, :end 1578592448484}
;; {:node :f, :start 1578592448172, :end 1578592448484}
;; {:node :b, :start 1578592448484, :end 1578592449000}
;; {:node :c, :start 1578592448484, :end 1578592449000}
;; {:node :g, :start 1578592449000, :end 1578592449515}
;; {:node :d, :start 1578592449000, :end 1578592449703}]
;;a goes faster, its children get on the fringe first...
;; dagdemo> (async-walk 2 timed-work the-dag)
;; [{:node :a, :start 1578592450812, :end 1578592451015}
;; {:node :e, :start 1578592450812, :end 1578592451718}
;; {:node :c, :start 1578592451718, :end 1578592451827}
;; {:node :b, :start 1578592451015, :end 1578592451921}
;; {:node :d, :start 1578592451921, :end 1578592452030}
;; {:node :f, :start 1578592451827, :end 1578592452140}
;; {:node :g, :start 1578592452140, :end 1578592452749}]
(defn workers [n f from-chan]
(dotimes [i n]
(async/thread
(when-let [nd (async/<!! from-chan)]
(f nd)
(recur)))))
(defn root-walk [n f graph]
(let [fringe (async/chan 100)
pending (async/chan n (distinct))
results (async/chan 100)
remaining (atom graph)
push-fringe (fn [xs] (async/onto-chan fringe xs false))
push-result (fn [v]
(async/>!! results v)
(when (zero? (count (g/nodes @remaining)))
(close-all! pending fringe results)))
work (fn [nd]
(let [v (f nd)
roots (g/get-roots
(swap! remaining g/drop-nodes [nd]))]
(push-fringe roots)
(push-result v)))]
(push-fringe (g/get-roots graph))
(async/go-loop []
(when-let [nd (async/<! fringe)]
(async/>! pending nd)
(recur)))
(workers n work pending)
(async/<!! (async/into [] results))))
;; dagdemo> (root-walk 2 timed-work the-dag)
;; [{:node :a, :start 1578624540883, :end 1578624541086}
;; {:node :b, :start 1578624541086, :end 1578624541086}
;; {:node :e, :start 1578624540883, :end 1578624541398}
;; {:node :f, :start 1578624541398, :end 1578624541601}
;; {:node :c, :start 1578624541601, :end 1578624541601}
;; {:node :d, :start 1578624541601, :end 1578624541804}
;; {:node :g, :start 1578624541601, :end 1578624542007}]
;; dagdemo> (root-walk 2 timed-work the-dag)
;; [{:node :a, :start 1578624544210, :end 1578624544413}
;; {:node :e, :start 1578624544210, :end 1578624544413}
;; {:node :b, :start 1578624544413, :end 1578624545022}
;; {:node :f, :start 1578624544413, :end 1578624545319}
;; {:node :g, :start 1578624545319, :end 1578624545429}
;; {:node :c, :start 1578624545319, :end 1578624545725}
;; {:node :d, :start 1578624545725, :end 1578624546241}]
;; dagdemo> (root-walk 2 timed-work the-dag)
;; [{:node :e, :start 1578624550943, :end 1578624551052}
;; {:node :f, :start 1578624551052, :end 1578624551162}
;; {:node :g, :start 1578624551162, :end 1578624551271}
;; {:node :a, :start 1578624550943, :end 1578624551646}
;; {:node :c, :start 1578624551646, :end 1578624551958}
;; {:node :b, :start 1578624551646, :end 1578624552161}
;; {:node :d, :start 1578624552161, :end 1578624552770}]
;; dagdemo> (root-walk 2 timed-work the-dag)
;; [{:node :e, :start 1578624592479, :end 1578624592682}
;; {:node :a, :start 1578624592479, :end 1578624592791}
;; {:node :f, :start 1578624592682, :end 1578624592791}
;; {:node :c, :start 1578624592791, :end 1578624593104}
;; {:node :g, :start 1578624593104, :end 1578624593104}
;; {:node :b, :start 1578624592791, :end 1578624593306}
;; {:node :d, :start 1578624593306, :end 1578624593416}]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment