Skip to content

Instantly share code, notes, and snippets.

@jjttjj
Last active September 6, 2019 21:34
Show Gist options
  • Save jjttjj/854e725c2ae1e0a1f3b5a4b235252df3 to your computer and use it in GitHub Desktop.
Save jjttjj/854e725c2ae1e0a1f3b5a4b235252df3 to your computer and use it in GitHub Desktop.
(require '[clojure.core.async :as a])
(do
(def conn1 {:handlers (atom #{})})
(def on? (atom true))
;;;keep passing random ints to every handler
(def process
(future (while @on?
(doseq [f @(:handlers conn1)]
(f (rand-int 100))
(Thread/sleep 25)))))
(defn add-handler [conn f] (update conn :handlers swap! conj f))
(defn remove-handler [conn f] (update conn :handlers swap! disj f))
(defn pipe-then
"Like core.async/pipe but runs callback `cb` on close of `from`"
[from to cb]
(a/go-loop []
(let [v (a/<! from)]
(if (nil? v)
(do (a/close! to) (cb))
(when (a/>! to v) (recur)))))
to)
(defn put-msgs!
"Adds a handler to `conn` which puts all messages onto channel
`ch`. Removes the handler when `ch` closes."
[conn ch]
(let [out-ch (a/chan 1) ;;should this be a parameter?
h (fn [msg] (a/>!! ch msg))]
(add-handler conn h)
(pipe-then ch out-ch (fn []
(remove-handler conn h)))))
(def result-chan (put-msgs! conn1 (a/chan 1 (comp (filter even?) (take 10)))))
(a/<!! (a/into [] result-chan))
)
(reset! on? false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment