Last active
October 10, 2018 17:23
-
-
Save eneroth/17b34363d771ab7ed20929ab828c1875 to your computer and use it in GitHub Desktop.
Parallel application of a function over arguments, constrained by worker count
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn- process-one | |
"Takes a channel and a function. Expects vectors of [index arguments] | |
on the channel. Takes from the channel, and applies the function | |
to the arguments part of the vector. Returns an output-channel, | |
on which the index and the result of the function application is returned." | |
[in-ch function] | |
(let [out-ch (chan)] | |
(go | |
(loop [item (<! in-ch)] | |
(if item | |
(let [[index values] item] | |
(>! out-ch [index (apply function values)]) | |
(recur (<! in-ch))) | |
(close! out-ch)))) | |
out-ch)) | |
(defn- p-apply* | |
"Takes a function, a collection of collections containing | |
arguments, and a count (greater than zero) of how many parallel workers to | |
launch. Optionally takes a buffer argument for the returned channel. | |
Returns a channel containing vectors on the format [index result]. The | |
index indicates the position in the original collection of collections of arguments. | |
The return channel is closed once all results have been taken from it. | |
Non-deterministic: results may come in any order. Sort on index if order matters." | |
([function coll parallel-count] | |
(p-apply* function coll parallel-count nil)) | |
([function coll parallel-count buffer-count] | |
(let [in-ch (chan) | |
create #(process-one in-ch function) | |
channels (repeatedly parallel-count create) | |
out-ch (if buffer-count (a/merge channels buffer-count) (a/merge channels))] | |
(onto-chan in-ch (map-indexed vector coll)) | |
out-ch))) | |
(defn p-apply | |
"Takes a function, a collection of collections containing | |
arguments, and a count (greater than zero) of how many parallel workers to | |
launch. Optionally takes a buffer argument for the returned channel. | |
The number of parallel processes will remain at (or as close as possible to) | |
the maximum number of parallel processes set when invoked. | |
Returns a channel containing the results of applying the function to each | |
collection of arguments. The return channel is closed once all results | |
have been taken from it. | |
Non-deterministic: results may come in any order." | |
([function coll parallel-count] | |
(p-apply function coll parallel-count nil)) | |
([function coll parallel-count buffer-count] | |
(let [result-ch (p-apply* function coll parallel-count buffer-count) | |
out-ch (chan (or buffer-count 1) (map second))] | |
(a/pipe result-ch out-ch) | |
out-ch))) | |
(defn p-apply-sync | |
"Takes a function, a collection of collections containing | |
arguments, and a count (greater than zero) of how many parallel | |
workers to launch. | |
The number of parallel processes will remain at (or as close as possible to) | |
the maximum number of parallel processes set when invoked. | |
Returns a collection of the results. Guaranteed to be in the same order | |
as the input collection." | |
[function coll parallel-count] | |
(let [out-ch (p-apply* function coll parallel-count nil) | |
collector (transient (vector))] | |
(loop [item (<!! out-ch)] | |
(if item | |
(do | |
(conj! collector item) | |
(recur (<!! out-ch))) | |
(mapv second (sort-by first (persistent! collector))))))) | |
;; For measuring execution time | |
;; ################################## | |
(comment | |
(defn call-p-apply [function data p] | |
(let [out-ch (p-apply function data p)] | |
(loop [item (<!! out-ch)] | |
(when item | |
(recur (<!! out-ch)))))) | |
;; Measure | |
;; ################################## | |
(require '[criterium.core :as qb]) | |
(def test-data (repeatedly 1000 #(range 1000))) | |
;; p-apply | |
(qb/bench (call-p-apply str test-data 1)) | |
;; => Average: 63ms, Standard deviation: 5ms | |
(qb/bench (call-p-apply str test-data 2)) | |
;; => Average: 32ms, Standard deviation: 745µs | |
(qb/bench (call-p-apply str test-data 4)) | |
;; => Average: 21ms, Standard deviation: 542µs | |
(qb/bench (call-p-apply str test-data 8)) | |
;; => Average: 15ms, Standard deviation: 354µs | |
;; p-apply-sync | |
(qb/bench (p-apply-sync str test-data 1)) | |
;; => Average: 62ms, Standard deviation: 901µs | |
(qb/bench (p-apply-sync str test-data 2)) | |
;; => Average: 34ms, Standard deviation: 906µs | |
(qb/bench (p-apply-sync str test-data 4)) | |
;; => Average: 23ms, Standard deviation: 1.2ms | |
(qb/bench (p-apply-sync str test-data 8)) | |
;; => Average: 18ms, Standard deviation: 1ms | |
#__) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Suweet.