Last active
December 21, 2015 12:59
-
-
Save bkirkbri/6309498 to your computer and use it in GitHub Desktop.
Interfacing synchronous systems with core.async
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(require [clojure.core.async | |
:refer [chan dropping-buffer thread <!! >!! >!]]) | |
(defmacro try! [ch & body] | |
`(let [ch# ~ch] | |
(try ~@body (catch Throwable t# | |
(when ch# (>! ch# t#)))))) | |
(defmacro try!! [ch & body] | |
`(let [ch# ~ch] | |
(try ~@body (catch Throwable t# | |
(when ch# (>!! ch# t#)))))) | |
(defn pool | |
"Creates a pool of threads that will listen for messages of the | |
form `[ch f]` by running `(f)` and passing the result into `ch`. | |
Returns a channel that will listen for requests. | |
Takes optional args: | |
* `in`, the channel on which to listen for messages | |
* `err`, a channel to send exceptions that are encountered | |
* `size`, the number of threads in the pool | |
Closing `in` will end processing and shutdown the pool." | |
[& opts] | |
(let [{:keys [in err size] | |
:or {in (chan) size 1}} opts] | |
(dotimes [_ size] | |
(thread | |
(while (try | |
(when-let [[out thunk] (<!! in)] | |
(try!! err (>!! out (thunk))) | |
true) | |
(catch Throwable _ true))))) | |
in)) | |
(defn async* | |
[pool out err thunk] | |
(let [out (or out (chan 1)) | |
thunk (if err #(try!! err (thunk)) thunk)] | |
(>!! pool [out thunk]) | |
out)) | |
(defmacro async | |
"Performs `body` asynchronously using `pool`, returning the result | |
via a channel. The return channel may be specified with `:out ch` | |
and an error channel can be provided using `:err ch`. | |
Returns the return channel (whether provided or not). | |
For example: `(<!! (async pool :err err-ch (Thread/sleep 100) 42))`" | |
[pool & body] | |
(loop [opts {} | |
[k v & more :as body] body] | |
(if (#{:out :err} k) | |
(recur (assoc opts k v) more) | |
(let [{:keys [out err]} opts] | |
`(async* ~pool ~out ~err (fn [] ~@body)))))) | |
(defn single-error-channel | |
"Returns a channel which will accept a single error and drop | |
subsequent puts." | |
[] | |
(chan (dropping-buffer 1))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment