Last active
January 17, 2017 04:54
-
-
Save noxecane/8c966f35a0ecf7429e0c2c26dbe01630 to your computer and use it in GitHub Desktop.
Using core.async for pub-sub in where the subscriber defines what he wants
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns tools.events | |
(:require [clojure.core.async :as a])) | |
(defn new-emitter | |
"Creates a map to hold configuration of an emitter" | |
([] | |
(new-emitter 10)) | |
([buff] | |
(let [in (a/chan 10) | |
sock (a/mult in)] | |
{:in in :sock sock}))) | |
(defn publish | |
"Notifies all connected channels of a new event object." | |
[{in :in} event] | |
(a/put! in event)) | |
(defn subscribe | |
"Creates a channel that listens to events filtered using mine? | |
from the emitter. The default buff(buffer size) is 10" | |
([emitter mine?] | |
(subscribe emitter mine? 10)) | |
([{sock :sock} mine? buff] | |
(let [out (a/chan buff (filter mine?))] | |
(a/tap sock out) | |
out))) | |
(defn unsubscribe | |
"Stops listening to an emitter and closes the channel" | |
[{sock :sock} ech] | |
(a/untap sock ech) | |
(a/close! ech)) | |
(defn key-is? [key val] | |
#(-> % key (= val))) | |
(defmacro doevent | |
"Sugar to loop over an event channel asynchronously" | |
[binding & body] | |
(let [[name channel] binding] | |
`(a/go-loop [~name (a/<! ~channel)] | |
(when-not (nil? ~name) | |
~@body | |
(recur (a/<! ~name)))))) | |
(defn on | |
"Proxy to doevent for function handlers" | |
[ech handler] | |
(doevent [e ech] | |
(handler e))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment