Created
January 30, 2014 05:42
-
-
Save daveray/8703191 to your computer and use it in GitHub Desktop.
merge a channel of channels in core.async
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn merge+ | |
"Takes a *channel* of source channels and returns a channel which | |
contains all values taken from them. The returned channel will be | |
unbuffered by default, or a buf-or-n can be supplied. The channel | |
will close after all the source channels have closed." | |
([in-ch] (merge+ in-ch nil)) | |
([in-ch buf-or-n] | |
(let [out-ch (async/chan buf-or-n)] | |
(async/go-loop [cs [in-ch]] | |
(if-not (empty? cs) | |
(let [[v c] (async/alts! cs)] | |
(cond | |
(nil? v) | |
(recur (filterv #(not= c %) cs)) | |
(= c in-ch) | |
(recur (conj cs v)) | |
:else | |
(do | |
(async/>! out-ch v) | |
(recur cs)))) | |
(async/close! out-ch))) | |
out-ch))) | |
(comment | |
(->> (async/to-chan [(async/to-chan [1 2 3]) | |
(async/to-chan [8 9 10 11]) | |
(async/to-chan [4 5 6 7]) | |
(async/to-chan [12]) | |
(async/to-chan [13 14 15 16 17 18]) ]) | |
(merge+) | |
(async/into []) | |
(async/<!!)) | |
;=> | |
[1 2 8 12 3 4 13 5 6 9 14 10 11 15 16 17 7 18]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you for the gist. I've spent like a day trying to implement flatmap function and after some time I've found your merge+ function which does the exactly this.