Skip to content

Instantly share code, notes, and snippets.

@gardnervickers
Last active March 23, 2016 04:13
Show Gist options
  • Select an option

  • Save gardnervickers/248a833d8819f190a6c6 to your computer and use it in GitHub Desktop.

Select an option

Save gardnervickers/248a833d8819f190a6c6 to your computer and use it in GitHub Desktop.
(def msg-seq [{:msg 1} {:msg 2} {:msg 3} {:type :barrier} {:msg 4} {:msg 5}])
(defn start-peer [id a sleepiness] ;; Simulates a peer lifecycle, just writing the message it got under it's slot in a db atom
(let [ch (chan)
peer (go-loop [msg (<! ch)]
(when msg
(Thread/sleep sleepiness)
(swap! a update id conj msg)
(recur (<! ch))))]
ch))
(defn start-alts [& chans] ;; Route segment to the first peer that's ready (unblocked). Will NOT lock the channel ch
(let [ch (chan)
v (go-loop [msg (<! ch)]
(when msg
(alts!
(mapv (fn [c] [c msg]) chans))
(recur (<! ch))))]
ch))
(defn start-router [pred t f] ;; routes to t-chan or f-chan based on pred.
(let [ch (chan)
v (go-loop [msg (<! ch)]
(when msg
(println msg)
(if (pred msg)
(>! t msg)
(>! f msg))
(recur (<! ch))))]
ch))
(let [
db (atom {:p1 [] :p2 [] :p3 []})
p1 (start-peer :p1 db 1)
p2 (start-peer :p2 db 500)
p3 (start-peer :p3 db 1000)
barrier-chan (core.async.labs/broadcast p1 p2 p3)
alts-chan (start-alts p1 p2 p3)
router-chan (start-router (fn [m] (= (:type m) :barrier)) ;; If :barrier for :type in message, broadcast to all channels
barrier-chan ;; synchonously using broadcast. If not (normal message) use the
alts-chan)] ;; backpressure aware alts! to pick first non-busy task.
(pipe (core.async.labs/spool msg-seq) router-chan) ;; Pipe spooled sq into router to get everything started
(Thread/sleep 5000)
(clojure.pprint/pprint @db))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment