Last active
February 25, 2020 14:02
-
-
Save candera/7553134 to your computer and use it in GitHub Desktop.
dopar
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn dopar | |
"Given a (potentially infinite) sequence `coll`, uses core.async to | |
run `f` for side effects against each value in the collection. | |
Performs at most `concur` operations in parallel, and never enqueues | |
more than `lead` items ahead of the ones being consumed. If any call | |
to `f` throws an exception, it will be rethrown from this function. | |
Otherwise, returns nil. Optional timeout value is number of | |
milliseconds to wait for all operations to complete." | |
([coll f concur lead] (dopar coll f concur lead nil)) | |
([coll f concur lead timeout-ms] | |
(let [req (async/chan lead) | |
resp (async/chan concur) | |
timeout (when timeout-ms (async/timeout timeout-ms)) | |
success (Object.)] | |
;; This loop creates `concur` go blocks, which may run in parallel | |
(dotimes [i concur] | |
(async/go-loop [] | |
(when-let [val (async/<! req)] | |
(try | |
(f val) | |
(async/>! resp success) | |
(catch Throwable t | |
(async/>! resp t) | |
(async/close! req) | |
(async/close! resp) | |
;; Can't return nil, as the value of the body of | |
;; go-loop is delivered on the channel that's | |
;; implicitly created, and one can't put nil on a | |
;; channel. Might as well return the exception. | |
t)) | |
(recur)))) | |
;; This loop puts work onto the request queue, takes work off | |
;; the response queue, and does bookkeeping. | |
(loop [[head & more :as both] coll | |
enqueued 0 | |
successes 0] | |
(log/warn head more) | |
(log/warn enqueued successes) | |
(when (or head (< successes enqueued)) | |
(when-not head (async/close! req)) | |
(let [ports (remove nil? [resp timeout (when head [req head])]) | |
[val port] (async/alts!! ports)] | |
(cond | |
(= port req) (recur more (inc enqueued) successes) | |
(= port resp) (if (= val success) | |
(recur both enqueued (inc successes)) | |
(do (async/close! req) | |
(async/close! resp) | |
(throw (ex-info "Unexpected response" | |
{:reason :unexpected-response | |
:response val})))) | |
(= port timeout) (do (async/close! req) | |
(async/close! resp) | |
(throw (ex-info "Operation timed out" {:reason :timed-out}))) | |
:else (do (async/close! req) | |
(async/close! resp) | |
(throw (ex-info "Unexpected result from alts!!" | |
{:reason :wtf-alts | |
:port port})))))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment