Created
July 15, 2020 01:28
-
-
Save dvliman/da3bb5f63fa6a763dd2df178db28f94d to your computer and use it in GitHub Desktop.
dpsutton's snippet
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn ->Function | |
[f] | |
(reify java.util.function.Function | |
(apply [_ obj] (f obj)))) | |
(defn future-map | |
[cf f] | |
(.thenApply cf (->Function f))) | |
(defn future->ch | |
([cf] | |
(future->ch cf (async/chan 1) true)) | |
([cf ch] | |
(future->ch cf ch false)) | |
([^CompletableFuture cf ch close?] | |
(.whenComplete cf (reify java.util.function.BiConsumer | |
(accept [_ obj t] | |
(when-some [ret (if t | |
{:cognitect.anomalies/category | |
:cognitect.anomalies/fault | |
:throwable t} | |
obj)] | |
(async/put! ch ret)) | |
(when close? (async/close! ch))))) | |
ch)) | |
| |
;; and consumer: | |
(defn ffprobe | |
"Returns a channel with results from ffprobe" | |
[^Path p] | |
(let [args ["ffprobe" | |
"-loglevel" "warning" | |
"-print_format" "json" | |
"-show_streams" | |
"-show_format" | |
(.. p toAbsolutePath toString)] | |
handle (fn [^Process proc] | |
(let [err (.getErrorStream proc) | |
stdout (.getInputStream proc) | |
exit (.exitValue proc) | |
ret {:path p | |
:ffmpeg/error (slurp err)}] | |
(if (zero? exit) | |
(merge ret (-> stdout io/reader json/read)) | |
(assoc ret :exit exit :stdout (slurp stdout)))))] | |
(-> (.. (ProcessBuilder. ^java.util.List args) start onExit) | |
(util/future-map handle) | |
(util/future->ch)))) | |
(defn pipeline | |
"Runs asynchronous function 'af' on each input from channel 'in', | |
producing results to channel 'out'. af is presumed to return a channel | |
| |
Input order is *not* preserved. | |
| |
Runs af with maximum 'max' concurrency. max can be an integer | |
or a function returning integer (allowing dynamic concurrency | |
control) | |
| |
close?, default true, controls whether the output channel is closed | |
upon completion" | |
([max af in out] | |
(pmax max af in out true)) | |
([max af in out close?] | |
(let [max (if (int? max) (constantly max) max) | |
reads #(cond-> % (< (count %) (max)) (conj in)) | |
drain (fn [tasks] | |
(async/go | |
(loop [tasks tasks] | |
(when (seq tasks) | |
(let [[v sc] (async/alts! (vec tasks))] | |
(when (some? v) (async/>! out v)) | |
(recur (disj tasks sc))))) | |
(when close? (async/close! out))))] | |
(async/go | |
(loop [tasks #{}] | |
(let [chs (vec (reads tasks))] | |
(when (pos? (count chs)) | |
(let [[v ch] (async/alts! chs)] | |
(if (= ch in) | |
(if v | |
(recur (conj tasks (af v))) | |
(async/<! (drain (disj tasks in)))) | |
(do | |
(when (some? v) (async/>! out v)) | |
(recur (disj tasks ch)))))))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment