Created
November 28, 2014 17:06
-
-
Save favila/8e7ad6ea5b01bd7466ff to your computer and use it in GitHub Desktop.
Some missing pieces of core.async. as-transducer: Make a transducer function easily without Clojure 1.7. go-pipe: async/pipe, but returns the go-loop. fast-pipeline-blocking: faster than async/pipeline-blocking, but unordered results. blocking-consumer: consume a channel with multiple worker threads.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns favila.async-util | |
"Some missing pieces of core.async. | |
as-transducer: Make a transducer function easily without Clojure 1.7. | |
go-pipe: async/pipe, but returns the go-loop. | |
fast-pipeline-blocking: faster than async/pipeline-blocking, but unordered results. | |
blocking-consumer: consume a channel with multiple worker threads." | |
(:require [clojure.core.async :as async | |
:refer [go go-loop <! >! <!! >!! close! take! put! chan]])) | |
(defn as-transducer [f] | |
"Convert function f into a transducer function. | |
This is merely a more convenient way to write an efficient reducing-step | |
function without the transducer signature. | |
The function should have the signature `(f appender opaque-collection value)` | |
and apply `appender` to `opaque-collection` to add values to the final | |
reduced value. It should return the \"modified\" opaque collection like a | |
normal transducer. It can also box it with `reduced`." | |
(fn [xf] | |
(fn | |
([] (xf)) | |
([r] (xf r)) | |
([r v] (f xf r v))))) | |
(defn go-pipe | |
"Like async/pipe, but returns the channel for the pipe's go block instead of | |
`to`. This is so we can monitor the piping \"process\" directly." | |
[from to close?] | |
(go-loop [v (<! from)] | |
(if (nil? v) | |
(when close? (close! to)) | |
(when (>! to v) | |
(recur (<! from)))))) | |
(defn fast-pipeline-blocking | |
"Like `pipeline-blocking`, except there is no guaranteed relative order | |
between inputs and outputs, and the return value is a read-only channel which | |
closes when all workers are finished. | |
The advantage of this over `pipeline-blocking` is increased thoroughput; | |
the disadvantage is that results come in no defined order. | |
Spawns n worker threads, each of which applies transforming function xf to an | |
item from `from` and puts it on `to`. | |
If `close?` is true (default true), `to` will be closed when `from` is closed | |
and *after* flushing all workers (i.e. no items already created by workers | |
for `to` will be lost)." | |
([n to xf from] (fast-pipeline-blocking n to xf from true)) | |
([n to xf from close?] | |
(fast-pipeline-blocking n to xf from close? | |
(fn [ex] | |
(-> (Thread/currentThread) | |
.getUncaughtExceptionHandler | |
(.uncaughtException (Thread/currentThread) ex)) | |
nil))) | |
([n to xf from close? ex-handler] | |
(assert (pos? n)) | |
(let [make-worker (fn [] | |
(async/thread | |
(let [xfc (chan 1 xf ex-handler) | |
pipes (async/merge [(go-pipe from xfc true) | |
(go-pipe xfc to false)])] | |
;; This will block until from and xfc are closed. | |
(<!! pipes)))) | |
workers (async/merge (repeatedly n make-worker))] | |
(if close? | |
(go (<! workers) (close! to) nil) | |
workers)))) | |
(defn blocking-consumer | |
"Spawn `n` threads, each of which will read from channel `ch` and call | |
`f` with the read value as its only argument. Presumably, `f` will case a | |
side-effect with the value. | |
If `f` does something long-running, it should block to exert back-pressure | |
on ch. | |
Returns a channel which closes when all workers close. All workers will close | |
when `ch` closes." | |
[n ch f] | |
(letfn [(make-worker [] | |
(async/thread | |
(loop [v (<!! ch)] | |
(when-not (nil? v) | |
(f v) | |
(recur (<!! ch))))))] | |
(async/merge (repeatedly n make-worker)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment