-
-
Save w01fe/4710008 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns plumbing.graph-async | |
(:require | |
[plumbing.fnk.pfnk :as pfnk] | |
[plumbing.fnk.schema :as schema] | |
[plumbing.core :as plumbing] | |
[plumbing.graph :as graph])) | |
;; async function has ^:async metadata, callback required key. | |
;; TODO: redo with just promises/futures once they have callback options | |
;; TODO: make nicer way to specify async fnks? | |
(defn asyncify | |
"Take a fnk f and return an async version of f with ^:async metadata, | |
which accepts a keyword parameter :callback called with the result. | |
If f is already async, returns it; otherwise, wraps it in an async | |
wrapper." | |
[f] | |
(if (:async (meta f)) | |
(do (assert (contains? (pfnk/input-schema f) :callback)) | |
f) | |
(do (assert (not (contains? (pfnk/input-schema f) :callback))) | |
(pfnk/fn->fnk | |
(fn [m] ((:callback m) (f (dissoc m :callback)))) | |
[(assoc (pfnk/input-schema f) :callback true) | |
(pfnk/output-schema f)])))) | |
(defn syncify | |
"Take an async fnk f and return a sync version that runs f then blocks | |
for the callback to be called and finally returns the async result." | |
[f] | |
(assert (and (:async (meta f)) (contains? (pfnk/input-schema f) :callback))) | |
(pfnk/fn->fnk | |
(fn [m] | |
(let [p (promise)] | |
(f (assoc m :callback #(deliver p %))) | |
@p)) | |
[(dissoc (pfnk/input-schema f) :callback) | |
(pfnk/output-schema f)])) | |
(defn async-parallel-compile | |
"Experimental. | |
Compile a hierarchical graph with (some) async fnks into an async fnk. | |
An async fnk has ^:async metadata, and accepts a special :callback | |
keyword parameter which will be called with the result of the function | |
(rather than returning it directly.) | |
Each node function will be launched in a future as soon as its dependencies | |
have been fully computed. Thus, | |
(syncify (async-parallel-compile g)) | |
is similar to graph/parallel-compile, except that: | |
- It can handle async fnks as well as ordinary sync fnks. | |
- It makes smarter use of threads, only launching a future when a node's | |
dependencies have been computed (rather than launching one future for | |
each node immediately, each of which blocks until its dependencies | |
are computed." | |
[g] | |
(if (fn? g) | |
(asyncify g) | |
(let [g (graph/->graph (plumbing/map-vals async-parallel-compile g)) | |
req-ks (schema/required-toplevel-keys (pfnk/input-schema g)) | |
edges (concat | |
(for [[k v] g | |
parent-k (filter g (keys (pfnk/input-schema v)))] | |
[parent-k k]) | |
(for [k (keys g)] | |
[k ::done])) | |
child-map (->> edges | |
(group-by first) | |
(plumbing/map-vals #(set (map second %)))) | |
parent-map (->> edges | |
(group-by second) | |
(plumbing/map-vals #(set (map first %))))] | |
(vary-meta | |
(pfnk/fn->fnk | |
(fn [m] | |
(let [missing-keys (seq (remove #(contains? m %) req-ks))] | |
(schema/assert-iae (empty? missing-keys) | |
"Missing top-level keys in graph input: %s" | |
(set missing-keys))) | |
(let [remaining-parents (atom parent-map) | |
results (atom m) | |
run-node (fn run-node [k] | |
(if (= ::done k) | |
((:callback m) (select-keys @results (keys g))) | |
(let [f (g k)] | |
(future (f (assoc (select-keys @results (keys (pfnk/input-schema f))) | |
:callback (fn [r] | |
(swap! results assoc k r) | |
(doseq [c (child-map k)] | |
(when (empty? (c (swap! remaining-parents | |
update-in [c] | |
disj k))) | |
(run-node c))))))))))] | |
(doseq [k (keys g)] | |
(when (empty? (parent-map k)) | |
(run-node k))))) | |
[(assoc (pfnk/input-schema g) :callback true) | |
(pfnk/output-schema g)]) | |
assoc :async true)))) | |
(comment | |
(time | |
(println | |
((syncify | |
(async-parallel-compile | |
{:a {:a1 (plumbing/fnk [x] (Thread/sleep 1000) (inc x)) | |
:a2 (vary-meta (plumbing/fnk [x callback] (Thread/sleep 1000) (callback (- x 10))) | |
assoc :async true)} | |
:b (plumbing/fnk [[:a a1]] (Thread/sleep 1000) (* a1 2)) | |
:c (plumbing/fnk [[:a a2]] (Thread/sleep 1000) (* a2 2))})) | |
{:x 1})))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment