Skip to content

Instantly share code, notes, and snippets.

@thheller
Last active February 11, 2016 13:48
Show Gist options
  • Save thheller/5973825 to your computer and use it in GitHub Desktop.
Save thheller/5973825 to your computer and use it in GitHub Desktop.
clojure.core.async pub/sub
(def my-topic (pubsub/topic 100))
(pubsub/subscribe-go
[subscription my-topic (sliding-buffer 100)]
(loop []
(when-let [ev (<! subscription)]
(prn [:sub-got ev])
(recur))))
;; without go
(let [sub (pubsub/subscribe my-topic (sliding-buffer 100))]
(prn [:msg (<!! sub)])
(close! sub))
(ns thheller.async.pubsub-test
(:use clojure.test
clojure.pprint
clojure.core.async)
(:require [thheller.async.pubsub :as pubsub]))
(deftest ^:wip subscribe-go-test
(let [topic (pubsub/topic 100)
s-result (pubsub/subscribe-go
[sub topic (sliding-buffer 10)]
;; wait for one msg and stop subscription
(<! sub))]
(>!! topic :hello-world)
;; not the best test cause if anything goes wrong it will block forever
(is (= :hello-world (<!! s-result)))
(is (empty? @(:subscriptions topic)))
))
(ns thheller.async.pubsub
(:use clojure.core.async)
(:require [clojure.core.async.impl.protocols :as imp]))
;; should be a real write-only channel at one point
(defrecord Topic [in subscriptions]
imp/WritePort
(put! [topic val fn0-handler]
(imp/put! in val fn0-handler))
imp/Channel
(close! [topic]
(imp/close! in)))
(defn- unsubscribe!
"given a subscription (result of subscribe) remove it from the topic"
[{:keys [topic in] :as sub}]
(swap! (:subscriptions topic) disj in)
topic)
;; should be a real read-only channel at one point
(defrecord Subscription [topic in]
imp/ReadPort
(take! [sub fn1-handler]
(imp/take! in fn1-handler))
imp/Channel
(close! [sub]
(unsubscribe! sub)
(imp/close! in)
))
(defn subscribe
"subscribe to the given topic, buf should be sliding/dropping in case the subscriber
cannot keep up. This is not enforced but a slow subscriber will slow down everyone
else. I decided to let the subscriber choose instead letting the topic drop messages.
close!ing a subscription will unsubscribe it from the topic"
[{:keys [subscriptions] :as topic} buf]
(let [sub-chan (chan buf)]
(swap! subscriptions conj sub-chan)
(Subscription. topic sub-chan)
))
(defn topic
"takes one arg, same as chan, returns a Topic (write-only channel) which
broadcasts all received messages to its subscribers."
[buf]
(let [c (chan buf)
subscriptions (atom #{})]
(go (loop []
(when-let [ev (<! c)]
(doseq [sub @subscriptions]
(>! sub ev))
(recur)))
;; close all subscriptions when topic is closed
(doseq [sub @subscriptions]
(close! sub)))
(->Topic c subscriptions)
))
(defmacro subscribe-go
"buf should either be sliding or dropping
the body will be inside a go block, so same rules apply
Example:
(subscribe-go
[sub topic (sliding-buffer 100)]
(loop []
(when-let [ev (<! sub)]
(prn [:sub ev])
(recur))))
"
[sub-defs & body]
(let [sub-defs (partition 3 sub-defs)
sub-lets (vec (mapcat (fn [[sub-name topic buf]]
`(~sub-name (subscribe ~topic ~buf)))
sub-defs))]
`(let ~sub-lets
(go (let [result# (do ~@body)]
~@(map (fn [[sub-name _ _]]
`(close! ~sub-name))
sub-defs)
result#)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment