Skip to content

Instantly share code, notes, and snippets.

@burbma
Last active December 22, 2020 21:27
Show Gist options
  • Save burbma/7faf3ed43744d7382d9d3b26b1bdb139 to your computer and use it in GitHub Desktop.
Save burbma/7faf3ed43744d7382d9d3b26b1bdb139 to your computer and use it in GitHub Desktop.
Clojure pmap with variable number of threads. Optionally operate on results real time via async/channel.
(defn npmap
"Like pmap but also takes n to specify the number of threads. Also differs
in that it can return the results on a channel. If you don't specify a
channel you will get all the results back in a vector after they are all
done. If you do pass in a channel results will be put on that channel
(by async/pipeline) as they are available. When all computation is done
async/pipeline closes said results channel."
([f n coll]
(->> coll
(npmap f n (async/chan))
(async/into [])
async/<!!))
([f n out-chan coll]
(async/pipeline n out-chan (map f) (async/to-chan coll))
out-chan))
;; Example usage that operates on results in real time.
(let [c (npmap identity ; Replace identity with your function.
64
(async/chan)
(range 1000) ; Replace with your seq of data.
)]
;; This way my writes to the file are serial and not clobbering each other.
(async/go-loop
[]
(let [result (async/<! c)]
(if (nil? result) ; Taking from a channel returns nil when it closes
nil ; so this exits the loop.
(do
(spit "output.txt" (format "%s\n" result) ; Replace with whatever you want to do with results.
:append true)
(recur))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment