Skip to content

Instantly share code, notes, and snippets.

@oubiwann
Last active June 13, 2024 13:24
Show Gist options
  • Save oubiwann/d281477454b4e9aedd162d46439ce3ac to your computer and use it in GitHub Desktop.
Save oubiwann/d281477454b4e9aedd162d46439ce3ac to your computer and use it in GitHub Desktop.
Clojure core.async Pub/Sub Example (with callbacks)
(ns pubsub-with-callbacks
"Adapted from Timothy Baldridge's 2013 core.async examples:
* https://raw.githubusercontent.com/halgari/clojure-conj-2013-core.async-examples/master/src/clojure_conj_talk/core.clj"
(:require [clojure.core.async :as async]))
(def pub-channel (async/chan 1))
(def publisher (async/pub pub-channel :tag))
(def print-channel (async/chan 1))
(defn run-print-channel
[]
(async/go-loop []
(when-let [value (async/<! print-channel)]
(println value)
(recur))))
(defn close-channels
[]
(async/close! pub-channel)
(async/close! print-channel))
(defn subscribe
[publisher subscriber tags callback]
(let [channel (async/chan 1)]
(doseq [tag tags]
(async/sub publisher tag channel))
(async/go-loop []
(when-let [value (async/<! channel)]
(async/>! print-channel (callback subscriber tags value))
(recur)))))
(defn callback
[subscriber tags value]
(pr-str
(format "%s Got message:%s"
subscriber value)))
(defn send-with-tags
[channel msg]
(doseq [tag (:tags msg)]
(println "sending... " tag)
(async/>!! channel {:tag tag
:msg (:msg msg)})))
;; Run in REPL:
;;
;; (run-print-channel)
;;
;; (subscribe publisher "I care about DOGS!" [:dogs] callback)
;; (subscribe publisher "I care about *cats*." [:cats] callback)
;; (subscribe publisher "I'm all about cats and dogs." [:cats :dogs] callback)
;;
;; (send-with-tags pub-channel {:msg "New Dog Story" :tags [:dogs]})
;; (send-with-tags pub-channel {:msg "New Cat Story" :tags [:cats]})
;; (send-with-tags pub-channel {:msg "New Pet Story" :tags [:cats :dogs]})
;;
;; (close-channels)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment