Last active
March 23, 2016 04:13
-
-
Save gardnervickers/248a833d8819f190a6c6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (def msg-seq [{:msg 1} {:msg 2} {:msg 3} {:type :barrier} {:msg 4} {:msg 5}]) | |
| (defn start-peer [id a sleepiness] ;; Simulates a peer lifecycle, just writing the message it got under it's slot in a db atom | |
| (let [ch (chan) | |
| peer (go-loop [msg (<! ch)] | |
| (when msg | |
| (Thread/sleep sleepiness) | |
| (swap! a update id conj msg) | |
| (recur (<! ch))))] | |
| ch)) | |
| (defn start-alts [& chans] ;; Route segment to the first peer that's ready (unblocked). Will NOT lock the channel ch | |
| (let [ch (chan) | |
| v (go-loop [msg (<! ch)] | |
| (when msg | |
| (alts! | |
| (mapv (fn [c] [c msg]) chans)) | |
| (recur (<! ch))))] | |
| ch)) | |
| (defn start-router [pred t f] ;; routes to t-chan or f-chan based on pred. | |
| (let [ch (chan) | |
| v (go-loop [msg (<! ch)] | |
| (when msg | |
| (println msg) | |
| (if (pred msg) | |
| (>! t msg) | |
| (>! f msg)) | |
| (recur (<! ch))))] | |
| ch)) | |
| (let [ | |
| db (atom {:p1 [] :p2 [] :p3 []}) | |
| p1 (start-peer :p1 db 1) | |
| p2 (start-peer :p2 db 500) | |
| p3 (start-peer :p3 db 1000) | |
| barrier-chan (core.async.labs/broadcast p1 p2 p3) | |
| alts-chan (start-alts p1 p2 p3) | |
| router-chan (start-router (fn [m] (= (:type m) :barrier)) ;; If :barrier for :type in message, broadcast to all channels | |
| barrier-chan ;; synchonously using broadcast. If not (normal message) use the | |
| alts-chan)] ;; backpressure aware alts! to pick first non-busy task. | |
| (pipe (core.async.labs/spool msg-seq) router-chan) ;; Pipe spooled sq into router to get everything started | |
| (Thread/sleep 5000) | |
| (clojure.pprint/pprint @db)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment