Last active
May 17, 2020 16:58
-
-
Save dpsutton/0f851684a0913be048ab43a6d49ca046 to your computer and use it in GitHub Desktop.
ghadi's completeable future helpers from slack
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn ->Function | |
[f] | |
(reify java.util.function.Function | |
(apply [_ obj] (f obj)))) | |
(defn future-map | |
[cf f] | |
(.thenApply cf (->Function f))) | |
(defn future->ch | |
([cf] | |
(future->ch cf (async/chan 1) true)) | |
([cf ch] | |
(future->ch cf ch false)) | |
([^CompletableFuture cf ch close?] | |
(.whenComplete cf (reify java.util.function.BiConsumer | |
(accept [_ obj t] | |
(when-some [ret (if t | |
{:cognitect.anomalies/category | |
:cognitect.anomalies/fault | |
:throwable t} | |
obj)] | |
(async/put! ch ret)) | |
(when close? (async/close! ch))))) | |
ch)) | |
;; and consumer: | |
(defn ffprobe | |
"Returns a channel with results from ffprobe" | |
[^Path p] | |
(let [args ["ffprobe" | |
"-loglevel" "warning" | |
"-print_format" "json" | |
"-show_streams" | |
"-show_format" | |
(.. p toAbsolutePath toString)] | |
handle (fn [^Process proc] | |
(let [err (.getErrorStream proc) | |
stdout (.getInputStream proc) | |
exit (.exitValue proc) | |
ret {:path p | |
:ffmpeg/error (slurp err)}] | |
(if (zero? exit) | |
(merge ret (-> stdout io/reader json/read)) | |
(assoc ret :exit exit :stdout (slurp stdout)))))] | |
(-> (.. (ProcessBuilder. ^java.util.List args) start onExit) | |
(util/future-map handle) | |
(util/future->ch)))) | |
(defn pipeline | |
"Runs asynchronous function 'af' on each input from channel 'in', | |
producing results to channel 'out'. af is presumed to return a channel | |
| |
Input order is *not* preserved. | |
| |
Runs af with maximum 'max' concurrency. max can be an integer | |
or a function returning integer (allowing dynamic concurrency | |
control) | |
| |
close?, default true, controls whether the output channel is closed | |
upon completion" | |
([max af in out] | |
(pmax max af in out true)) | |
([max af in out close?] | |
(let [max (if (int? max) (constantly max) max) | |
reads #(cond-> % (< (count %) (max)) (conj in)) | |
drain (fn [tasks] | |
(async/go | |
(loop [tasks tasks] | |
(when (seq tasks) | |
(let [[v sc] (async/alts! (vec tasks))] | |
(when (some? v) (async/>! out v)) | |
(recur (disj tasks sc))))) | |
(when close? (async/close! out))))] | |
(async/go | |
(loop [tasks #{}] | |
(let [chs (vec (reads tasks))] | |
(when (pos? (count chs)) | |
(let [[v ch] (async/alts! chs)] | |
(if (= ch in) | |
(if v | |
(recur (conj tasks (af v))) | |
(async/<! (drain (disj tasks in)))) | |
(do | |
(when (some? v) (async/>! out v)) | |
(recur (disj tasks ch)))))))))))) |
Author
dpsutton
commented
May 11, 2020
In the system I mentioned above,
the filesystem pump uses async/thread
the ffmpeg stuff is a CompletableFuture that dumps onto a channel
the concurrency limiter in the middle is a single go block
the main thread reads the results of video metadata extraction
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment