Skip to content

Instantly share code, notes, and snippets.

@joinr
Last active October 14, 2022 16:50
Show Gist options
  • Save joinr/2e14f13cfdde96c547a9eb01222d3223 to your computer and use it in GitHub Desktop.
Save joinr/2e14f13cfdde96c547a9eb01222d3223 to your computer and use it in GitHub Desktop.
(ns util
(:refer-clojure :exclude [pmap])
(:require [clojure.core.async :as async]))
(defn chan? [x]
(instance? clojure.core.async.impl.channels.ManyToManyChannel x))
(defn guess-physical-cores
"Hueristic used to account for likely prevalent
hyperthreading influencing the supposed available
processor count. For some runs, we would like to
stick close to the logical core count. A useful
heuristic is to just divide by 2."
[]
(let [n (.availableProcessors (Runtime/getRuntime))]
(case n
1 1
(quot n 2))))
(defn as-chan [xs]
(cond (chan? xs) xs
(coll? xs) (async/to-chan xs)
:else (throw (ex-info "unknown channel type" {:in xs}))))
(defn producer->consumer!! [n out f jobs]
(let [done? (atom 0)
res (async/chan n)
workers (dotimes [i n]
(async/thread
(loop []
(if-let [nxt (async/<!! jobs)]
(let [res (f nxt)
_ (async/>!! out res)]
(recur))
(let [ndone (swap! done? inc)]
(when (= ndone n)
(do (async/close! out)
(async/>!! res true))))))))]
res))
(defn unordered-pmap>
([n f xs]
(let [out (async/chan (* n 2))
in (as-chan xs)
pipe (producer->consumer!!
n
out
f
in)]
out))
([f xs] (unordered-pmap> (guess-physical-cores) f xs)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment