Skip to content

Instantly share code, notes, and snippets.

@eneroth
Last active October 10, 2018 17:23
Show Gist options
  • Save eneroth/17b34363d771ab7ed20929ab828c1875 to your computer and use it in GitHub Desktop.
Save eneroth/17b34363d771ab7ed20929ab828c1875 to your computer and use it in GitHub Desktop.
Parallel application of a function over arguments, constrained by worker count
(defn- process-one
"Takes a channel and a function. Expects vectors of [index arguments]
on the channel. Takes from the channel, and applies the function
to the arguments part of the vector. Returns an output-channel,
on which the index and the result of the function application is returned."
[in-ch function]
(let [out-ch (chan)]
(go
(loop [item (<! in-ch)]
(if item
(let [[index values] item]
(>! out-ch [index (apply function values)])
(recur (<! in-ch)))
(close! out-ch))))
out-ch))
(defn- p-apply*
"Takes a function, a collection of collections containing
arguments, and a count (greater than zero) of how many parallel workers to
launch. Optionally takes a buffer argument for the returned channel.
Returns a channel containing vectors on the format [index result]. The
index indicates the position in the original collection of collections of arguments.
The return channel is closed once all results have been taken from it.
Non-deterministic: results may come in any order. Sort on index if order matters."
([function coll parallel-count]
(p-apply* function coll parallel-count nil))
([function coll parallel-count buffer-count]
(let [in-ch (chan)
create #(process-one in-ch function)
channels (repeatedly parallel-count create)
out-ch (if buffer-count (a/merge channels buffer-count) (a/merge channels))]
(onto-chan in-ch (map-indexed vector coll))
out-ch)))
(defn p-apply
"Takes a function, a collection of collections containing
arguments, and a count (greater than zero) of how many parallel workers to
launch. Optionally takes a buffer argument for the returned channel.
The number of parallel processes will remain at (or as close as possible to)
the maximum number of parallel processes set when invoked.
Returns a channel containing the results of applying the function to each
collection of arguments. The return channel is closed once all results
have been taken from it.
Non-deterministic: results may come in any order."
([function coll parallel-count]
(p-apply function coll parallel-count nil))
([function coll parallel-count buffer-count]
(let [result-ch (p-apply* function coll parallel-count buffer-count)
out-ch (chan (or buffer-count 1) (map second))]
(a/pipe result-ch out-ch)
out-ch)))
(defn p-apply-sync
"Takes a function, a collection of collections containing
arguments, and a count (greater than zero) of how many parallel
workers to launch.
The number of parallel processes will remain at (or as close as possible to)
the maximum number of parallel processes set when invoked.
Returns a collection of the results. Guaranteed to be in the same order
as the input collection."
[function coll parallel-count]
(let [out-ch (p-apply* function coll parallel-count nil)
collector (transient (vector))]
(loop [item (<!! out-ch)]
(if item
(do
(conj! collector item)
(recur (<!! out-ch)))
(mapv second (sort-by first (persistent! collector)))))))
;; For measuring execution time
;; ##################################
(comment
(defn call-p-apply [function data p]
(let [out-ch (p-apply function data p)]
(loop [item (<!! out-ch)]
(when item
(recur (<!! out-ch))))))
;; Measure
;; ##################################
(require '[criterium.core :as qb])
(def test-data (repeatedly 1000 #(range 1000)))
;; p-apply
(qb/bench (call-p-apply str test-data 1))
;; => Average: 63ms, Standard deviation: 5ms
(qb/bench (call-p-apply str test-data 2))
;; => Average: 32ms, Standard deviation: 745µs
(qb/bench (call-p-apply str test-data 4))
;; => Average: 21ms, Standard deviation: 542µs
(qb/bench (call-p-apply str test-data 8))
;; => Average: 15ms, Standard deviation: 354µs
;; p-apply-sync
(qb/bench (p-apply-sync str test-data 1))
;; => Average: 62ms, Standard deviation: 901µs
(qb/bench (p-apply-sync str test-data 2))
;; => Average: 34ms, Standard deviation: 906µs
(qb/bench (p-apply-sync str test-data 4))
;; => Average: 23ms, Standard deviation: 1.2ms
(qb/bench (p-apply-sync str test-data 8))
;; => Average: 18ms, Standard deviation: 1ms
#__)
@avocade
Copy link

avocade commented Oct 10, 2018

Suweet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment