-
-
Save abp/3690541 to your computer and use it in GitHub Desktop.
Lazy seq as event subscription mechanism
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Here is a spike of a lightweight in-process pubsub mechanism that allows pure ;; functional consumers, both blocking and asynchronous. | |
;; This defines the event stream, in this case just a series of numbers, | |
;; a new one produced each second | |
(defn timer [] | |
(lazy-seq | |
(do | |
(Thread/sleep 1000) | |
(cons (System/nanoTime) (timer))))) | |
;; We can see some events (this takes a couple seconds to complete): | |
(take 3 (timer)) | |
;=> (3383024932037 3384025272769 3385025571742) | |
;; We can have two consumers run concurrently, but they're actually | |
;; consuming two different streams, each having started their own, | |
;; so this isn't quite what we want: | |
[@(future (take 3 (timer))) @(future (take 3 (timer)))] | |
;=> [(3445765729693 3446765925828 3447766208228) | |
; (3448766569987 3449766774030 3450767042063)] | |
;; So we want to set up a single producer that can be observed in a stable way: | |
(def most-recent (atom (timer))) | |
(defn advance-most-recent [] (swap! most-recent rest) (recur)) | |
;; There, now this kicks off a single producer: | |
(future (advance-most-recent)) | |
;; Now we can have two consumers look at the same stream of events. Note the | |
;; event numbers are identical for each consumer: | |
[@(future (take 3 @most-recent)) @(future (take 3 @most-recent))] | |
;=> [(3735764451405 3736764670106 3737764892260) | |
(3735764451405 3736764670106 3737764892260)] | |
;; If one consumer starts late, it may miss some events, but will still see | |
;; a consistent and sequential view: | |
[@(future (take 3 @most-recent)) | |
(do (Thread/sleep 1500) @(future (take 3 @most-recent)))] | |
;=> [(3756768628228 3757768831178 3758769051789) | |
; (3757768831178 3758769051789 3759769275472)] | |
;; Consumers could choose to use a watcher instead, for a callback or async-style | |
;; mechanism. This starts printing a new number every second: | |
(add-watch most-recent :async | |
(fn callback [_ _ _ stream] | |
(prn (first stream))) | |
;;========================== | |
;; If the producer wants to asynchronously drop items into the event stream, | |
;; we have to set it up a bit differently. First, a queue that the producer | |
;; can put events in: | |
(def producer-queue (java.util.concurrent.LinkedBlockingQueue.)) | |
;; Then the atom for consumers to follow: | |
(def most-recent | |
(atom ((fn more [] (lazy-seq (cons (.take producer-queue) (more))))))) | |
;; The thread for advancing most-recent is just as above: | |
(defn advance-most-recent [] (swap! most-recent rest) (recur)) | |
(future (advance-most-recent)) | |
;; Now the producer can be written like this: | |
(future | |
(loop [i 0] | |
(Thread/sleep 1000) | |
(.put producer-queue i) ;; Publish the next event | |
(recur (inc i)))) | |
;; The consumers also use exactly the same API as earlier, both async watchers | |
;; and blocking will work: | |
[@(future (take 3 @most-recent)) @(future (take 3 @most-recent))] | |
;=> [(45 46 47) (45 46 47)] | |
;; thanks to zenoli in #clojure irc for inspiration. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment