Last active
February 11, 2016 13:48
-
-
Save thheller/5973825 to your computer and use it in GitHub Desktop.
clojure.core.async pub/sub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(def my-topic (pubsub/topic 100)) | |
(pubsub/subscribe-go | |
[subscription my-topic (sliding-buffer 100)] | |
(loop [] | |
(when-let [ev (<! subscription)] | |
(prn [:sub-got ev]) | |
(recur)))) | |
;; without go | |
(let [sub (pubsub/subscribe my-topic (sliding-buffer 100))] | |
(prn [:msg (<!! sub)]) | |
(close! sub)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns thheller.async.pubsub-test | |
(:use clojure.test | |
clojure.pprint | |
clojure.core.async) | |
(:require [thheller.async.pubsub :as pubsub])) | |
(deftest ^:wip subscribe-go-test | |
(let [topic (pubsub/topic 100) | |
s-result (pubsub/subscribe-go | |
[sub topic (sliding-buffer 10)] | |
;; wait for one msg and stop subscription | |
(<! sub))] | |
(>!! topic :hello-world) | |
;; not the best test cause if anything goes wrong it will block forever | |
(is (= :hello-world (<!! s-result))) | |
(is (empty? @(:subscriptions topic))) | |
)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns thheller.async.pubsub | |
(:use clojure.core.async) | |
(:require [clojure.core.async.impl.protocols :as imp])) | |
;; should be a real write-only channel at one point | |
(defrecord Topic [in subscriptions] | |
imp/WritePort | |
(put! [topic val fn0-handler] | |
(imp/put! in val fn0-handler)) | |
imp/Channel | |
(close! [topic] | |
(imp/close! in))) | |
(defn- unsubscribe! | |
"given a subscription (result of subscribe) remove it from the topic" | |
[{:keys [topic in] :as sub}] | |
(swap! (:subscriptions topic) disj in) | |
topic) | |
;; should be a real read-only channel at one point | |
(defrecord Subscription [topic in] | |
imp/ReadPort | |
(take! [sub fn1-handler] | |
(imp/take! in fn1-handler)) | |
imp/Channel | |
(close! [sub] | |
(unsubscribe! sub) | |
(imp/close! in) | |
)) | |
(defn subscribe | |
"subscribe to the given topic, buf should be sliding/dropping in case the subscriber | |
cannot keep up. This is not enforced but a slow subscriber will slow down everyone | |
else. I decided to let the subscriber choose instead letting the topic drop messages. | |
close!ing a subscription will unsubscribe it from the topic" | |
[{:keys [subscriptions] :as topic} buf] | |
(let [sub-chan (chan buf)] | |
(swap! subscriptions conj sub-chan) | |
(Subscription. topic sub-chan) | |
)) | |
(defn topic | |
"takes one arg, same as chan, returns a Topic (write-only channel) which | |
broadcasts all received messages to its subscribers." | |
[buf] | |
(let [c (chan buf) | |
subscriptions (atom #{})] | |
(go (loop [] | |
(when-let [ev (<! c)] | |
(doseq [sub @subscriptions] | |
(>! sub ev)) | |
(recur))) | |
;; close all subscriptions when topic is closed | |
(doseq [sub @subscriptions] | |
(close! sub))) | |
(->Topic c subscriptions) | |
)) | |
(defmacro subscribe-go | |
"buf should either be sliding or dropping | |
the body will be inside a go block, so same rules apply | |
Example: | |
(subscribe-go | |
[sub topic (sliding-buffer 100)] | |
(loop [] | |
(when-let [ev (<! sub)] | |
(prn [:sub ev]) | |
(recur)))) | |
" | |
[sub-defs & body] | |
(let [sub-defs (partition 3 sub-defs) | |
sub-lets (vec (mapcat (fn [[sub-name topic buf]] | |
`(~sub-name (subscribe ~topic ~buf))) | |
sub-defs))] | |
`(let ~sub-lets | |
(go (let [result# (do ~@body)] | |
~@(map (fn [[sub-name _ _]] | |
`(close! ~sub-name)) | |
sub-defs) | |
result#))))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment