Skip to content

Instantly share code, notes, and snippets.

@dvliman
Created July 15, 2020 01:28
Show Gist options
  • Save dvliman/da3bb5f63fa6a763dd2df178db28f94d to your computer and use it in GitHub Desktop.
Save dvliman/da3bb5f63fa6a763dd2df178db28f94d to your computer and use it in GitHub Desktop.
dpsutton's snippet
(defn ->Function
[f]
(reify java.util.function.Function
(apply [_ obj] (f obj))))
(defn future-map
[cf f]
(.thenApply cf (->Function f)))
(defn future->ch
([cf]
(future->ch cf (async/chan 1) true))
([cf ch]
(future->ch cf ch false))
([^CompletableFuture cf ch close?]
(.whenComplete cf (reify java.util.function.BiConsumer
(accept [_ obj t]
(when-some [ret (if t
{:cognitect.anomalies/category
:cognitect.anomalies/fault
:throwable t}
obj)]
(async/put! ch ret))
(when close? (async/close! ch)))))
ch))
;; and consumer:
(defn ffprobe
"Returns a channel with results from ffprobe"
[^Path p]
(let [args ["ffprobe"
"-loglevel" "warning"
"-print_format" "json"
"-show_streams"
"-show_format"
(.. p toAbsolutePath toString)]
handle (fn [^Process proc]
(let [err (.getErrorStream proc)
stdout (.getInputStream proc)
exit (.exitValue proc)
ret {:path p
:ffmpeg/error (slurp err)}]
(if (zero? exit)
(merge ret (-> stdout io/reader json/read))
(assoc ret :exit exit :stdout (slurp stdout)))))]
(-> (.. (ProcessBuilder. ^java.util.List args) start onExit)
(util/future-map handle)
(util/future->ch))))
(defn pipeline
"Runs asynchronous function 'af' on each input from channel 'in',
producing results to channel 'out'. af is presumed to return a channel
Input order is *not* preserved.
Runs af with maximum 'max' concurrency. max can be an integer
or a function returning integer (allowing dynamic concurrency
control)
close?, default true, controls whether the output channel is closed
upon completion"
([max af in out]
(pmax max af in out true))
([max af in out close?]
(let [max (if (int? max) (constantly max) max)
reads #(cond-> % (< (count %) (max)) (conj in))
drain (fn [tasks]
(async/go
(loop [tasks tasks]
(when (seq tasks)
(let [[v sc] (async/alts! (vec tasks))]
(when (some? v) (async/>! out v))
(recur (disj tasks sc)))))
(when close? (async/close! out))))]
(async/go
(loop [tasks #{}]
(let [chs (vec (reads tasks))]
(when (pos? (count chs))
(let [[v ch] (async/alts! chs)]
(if (= ch in)
(if v
(recur (conj tasks (af v)))
(async/<! (drain (disj tasks in))))
(do
(when (some? v) (async/>! out v))
(recur (disj tasks ch))))))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment