Last active
January 10, 2020 14:33
-
-
Save joinr/d69476846be0b05cb891e93fbe5f2a8b to your computer and use it in GitHub Desktop.
Exploration for walking dags in parallel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns dagdemo | |
(:require [spork.cljgraph.core :as g] | |
[clojure.core.async :as async])) | |
;; A E | |
;; / \ | | |
;; B C <- F | |
;; \ / | | |
;; D G | |
(def the-dag | |
(->> [[:a :b] | |
[:a :c] | |
[:b :d] | |
[:c :d] | |
[:e :f] | |
[:f :c] | |
[:f :g]] | |
g/arcs->graph)) | |
(defn pmap! | |
([n f xs] | |
(let [output-chan (async/chan)] | |
(async/pipeline-blocking n | |
output-chan | |
(map f) | |
(async/to-chan xs)) | |
(async/<!! (async/into [] output-chan)))) | |
([f xs] (pmap! (.availableProcessors (Runtime/getRuntime)) f xs))) | |
(defn now [] (System/currentTimeMillis)) | |
(defn process [nd] | |
(let [start (now)] | |
(println [:processing! nd start]) | |
(Thread/sleep (* (rand-int 10) 100)) | |
[:processed! nd start :ended (now)])) | |
(defn pwalk [f g] | |
(apply concat | |
(for [nodes (g/topsort g)] | |
(pmap! 2 process nodes)))) | |
;;alt implementation | |
(defn work [f nd graph visited] | |
(if-not (visited nd) | |
{:value (f nd) | |
:pending (g/sinks graph nd)} | |
:none)) | |
(defn par-walk [n f graph] | |
(loop [fringe (g/get-roots graph) | |
visited #{} | |
acc []] | |
(if (seq fringe) | |
(let [res (filter map? | |
(pmap! 2 (fn [nd] | |
(work f nd graph visited)) fringe)) | |
new-fringe (distinct (mapcat :pending res)) | |
new-visited (into visited fringe)] | |
(recur new-fringe new-visited (into acc (map :value res)))) | |
acc))) | |
(defn timed-work [nd] | |
(let [start (now) | |
_ (Thread/sleep (* (rand-int 10) 100))] | |
{:node nd :start start :end (now)})) | |
;; dagdemo> (par-walk 2 timed-work the-dag) | |
;; [{:node :a, :start 1578585697327, :end 1578585697842} | |
;; {:node :e, :start 1578585697327, :end 1578585697530} | |
;; {:node :b, :start 1578585697842, :end 1578585698654} | |
;; {:node :c, :start 1578585697842, :end 1578585698248} | |
;; {:node :f, :start 1578585698248, :end 1578585698248} | |
;; {:node :d, :start 1578585698654, :end 1578585699357} | |
;; {:node :g, :start 1578585698654, :end 1578585698857}] | |
(defn close-all! [& xs] | |
(doseq [x xs] (async/close! x))) | |
;;this version fails to enforce precedence constraints! | |
(defn async-walk [n f graph] | |
(let [visited (atom #{}) | |
open (atom #{}) | |
fringe (async/chan 100) | |
pending (async/chan n (distinct)) | |
results (async/chan 100) | |
shutdown (fn [] (close-all! pending fringe results)) | |
_ (add-watch open :empty (fn [_ _ old new] | |
(if (empty? new) | |
(shutdown)))) | |
push-fringe (fn [xs] | |
(let [new (filter (fn [x] (not (@visited x))) xs)] | |
(swap! open #(into % new)) | |
(async/onto-chan fringe new false))) | |
_ (push-fringe (g/get-roots graph)) | |
work (fn [nd] | |
(let [res (work f nd graph @visited)] | |
(when (not= res :none) | |
(async/>!! results (:value res)) | |
(push-fringe (:pending res))) | |
(swap! visited conj nd) | |
(swap! open disj nd))) | |
workers (fn [] (dotimes [i n] | |
(async/thread | |
(when-let [nd (async/<!! pending)] | |
(when-not (@visited nd) (work nd)) | |
(recur)))))] | |
(async/go-loop [] | |
(when-let [nd (async/<! fringe)] | |
(when-not (@visited nd) (async/>! pending nd)) | |
(recur))) | |
(workers) | |
(async/<!! (async/into [] results)))) | |
;;e goes faster, its children get on the fringe first... | |
;; dagdemo> (async-walk 2 timed-work the-dag) | |
;; [{:node :e, :start 1578592447969, :end 1578592448172} | |
;; {:node :a, :start 1578592447969, :end 1578592448484} | |
;; {:node :f, :start 1578592448172, :end 1578592448484} | |
;; {:node :b, :start 1578592448484, :end 1578592449000} | |
;; {:node :c, :start 1578592448484, :end 1578592449000} | |
;; {:node :g, :start 1578592449000, :end 1578592449515} | |
;; {:node :d, :start 1578592449000, :end 1578592449703}] | |
;;a goes faster, its children get on the fringe first... | |
;; dagdemo> (async-walk 2 timed-work the-dag) | |
;; [{:node :a, :start 1578592450812, :end 1578592451015} | |
;; {:node :e, :start 1578592450812, :end 1578592451718} | |
;; {:node :c, :start 1578592451718, :end 1578592451827} | |
;; {:node :b, :start 1578592451015, :end 1578592451921} | |
;; {:node :d, :start 1578592451921, :end 1578592452030} | |
;; {:node :f, :start 1578592451827, :end 1578592452140} | |
;; {:node :g, :start 1578592452140, :end 1578592452749}] | |
(defn workers [n f from-chan] | |
(dotimes [i n] | |
(async/thread | |
(when-let [nd (async/<!! from-chan)] | |
(f nd) | |
(recur))))) | |
(defn root-walk [n f graph] | |
(let [fringe (async/chan 100) | |
pending (async/chan n (distinct)) | |
results (async/chan 100) | |
remaining (atom graph) | |
push-fringe (fn [xs] (async/onto-chan fringe xs false)) | |
push-result (fn [v] | |
(async/>!! results v) | |
(when (zero? (count (g/nodes @remaining))) | |
(close-all! pending fringe results))) | |
work (fn [nd] | |
(let [v (f nd) | |
roots (g/get-roots | |
(swap! remaining g/drop-nodes [nd]))] | |
(push-fringe roots) | |
(push-result v)))] | |
(push-fringe (g/get-roots graph)) | |
(async/go-loop [] | |
(when-let [nd (async/<! fringe)] | |
(async/>! pending nd) | |
(recur))) | |
(workers n work pending) | |
(async/<!! (async/into [] results)))) | |
;; dagdemo> (root-walk 2 timed-work the-dag) | |
;; [{:node :a, :start 1578624540883, :end 1578624541086} | |
;; {:node :b, :start 1578624541086, :end 1578624541086} | |
;; {:node :e, :start 1578624540883, :end 1578624541398} | |
;; {:node :f, :start 1578624541398, :end 1578624541601} | |
;; {:node :c, :start 1578624541601, :end 1578624541601} | |
;; {:node :d, :start 1578624541601, :end 1578624541804} | |
;; {:node :g, :start 1578624541601, :end 1578624542007}] | |
;; dagdemo> (root-walk 2 timed-work the-dag) | |
;; [{:node :a, :start 1578624544210, :end 1578624544413} | |
;; {:node :e, :start 1578624544210, :end 1578624544413} | |
;; {:node :b, :start 1578624544413, :end 1578624545022} | |
;; {:node :f, :start 1578624544413, :end 1578624545319} | |
;; {:node :g, :start 1578624545319, :end 1578624545429} | |
;; {:node :c, :start 1578624545319, :end 1578624545725} | |
;; {:node :d, :start 1578624545725, :end 1578624546241}] | |
;; dagdemo> (root-walk 2 timed-work the-dag) | |
;; [{:node :e, :start 1578624550943, :end 1578624551052} | |
;; {:node :f, :start 1578624551052, :end 1578624551162} | |
;; {:node :g, :start 1578624551162, :end 1578624551271} | |
;; {:node :a, :start 1578624550943, :end 1578624551646} | |
;; {:node :c, :start 1578624551646, :end 1578624551958} | |
;; {:node :b, :start 1578624551646, :end 1578624552161} | |
;; {:node :d, :start 1578624552161, :end 1578624552770}] | |
;; dagdemo> (root-walk 2 timed-work the-dag) | |
;; [{:node :e, :start 1578624592479, :end 1578624592682} | |
;; {:node :a, :start 1578624592479, :end 1578624592791} | |
;; {:node :f, :start 1578624592682, :end 1578624592791} | |
;; {:node :c, :start 1578624592791, :end 1578624593104} | |
;; {:node :g, :start 1578624593104, :end 1578624593104} | |
;; {:node :b, :start 1578624592791, :end 1578624593306} | |
;; {:node :d, :start 1578624593306, :end 1578624593416}] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment