Skip to content

Instantly share code, notes, and snippets.

@jeroenvandijk
Last active May 28, 2016 05:58
Show Gist options
  • Save jeroenvandijk/67d064e0bb08b900e656 to your computer and use it in GitHub Desktop.
Save jeroenvandijk/67d064e0bb08b900e656 to your computer and use it in GitHub Desktop.
Server Side Events with Aleph
(comment
;; Deps
[cheshire "5.4.0"]
[manifold "0.1.1"]
[aleph "0.4.1-beta2"]
[org.clojure/clojure "1.7.0"]
[org.clojure/core.async "0.2.371"]
)
(require '[clojure.core.async :as a]
'[manifold.stream :as s]
'[manifold.stream :as s])
;; With the code below streams and channels are not closed after a client closes the connection
(defn hystrix-stream [subscription req]
(let [subscription-with-ping (a/chan)
ping-result (Object.)
;; Return a ping every two seconds when channels are not being filled
_ (a/go-loop []
(let [timeout-ch (a/timeout 2000)
[v ch] (a/alts! [subscription timeout-ch])
res (if (= ch timeout-ch)
ping-result
v)]
(when res
(a/>! subscription-with-ping res)
(recur))))
s (s/->source subscription-with-ping)
strm (->> s
(s/mapcat (fn [x]
(if (= x ping-result)
["ping"]
(metrics->hystrix-data x))))
(s/map (fn [msg]
(str "\ndata:" (json/encode msg) "\n"))))]
{:status 200 :body strm :headers {"Content-Type" "text/event-stream;charset=UTF-8"
"Cache-Control" "no-cache, no-store, max-age=0, must-revalidate"
"Pragma" "no-cache"}}))
;; Alternative below should stop when channel is closed, but this doesn't happen
(defn hystrix-stream-alt [subscription req]
(let [ping-result (Object.)
;; Return a ping every two seconds when channels are not being filled
_ (a/go-loop []
(a/<! (a/timeout 2000))
(if (a/>! subscription ping-result)
(recur)
(println "I was closed!!")))
s (s/->source subscription)
strm (->> s
(s/mapcat (fn [x]
(if (= x ping-result)
["ping"]
(metrics->hystrix-data x))))
(s/map (fn [msg]
(str "\ndata:" (json/encode msg) "\n"))))]
{:status 200 :body strm :headers {"Content-Type" "text/event-stream;charset=UTF-8"
"Cache-Control" "no-cache, no-store, max-age=0, must-revalidate"
"Pragma" "no-cache"}}))
;; When we return a source channel as response the closing does happen properly :)
(defn hystrix-stream-correct [subscription req]
(let [ping-result (Object.)
;; Return a ping every two seconds when channels are not being filled
xform (comp (mapcat (fn [x]
(if (= x ping-result)
["ping"]
(metrics->hystrix-data x))))
(map (fn [msg]
(str "\ndata:" (json/encode msg) "\n"))))
formatted-channel (a/chan 1 xform)
_ (a/go-loop []
(a/<! (a/timeout 2000))
(if (a/>! formatted-channel ping-result)
(recur)
(println "I was closed!!")))
;; Let core.async do the transformation so the closing happens correctly
_ (a/pipe subscription formatted-channel)]
{:status 200
:body (s/->source formatted-channel)
:headers {"Content-Type" "text/event-stream;charset=UTF-8"
"Cache-Control" "no-cache, no-store, max-age=0, must-revalidate"
"Pragma" "no-cache"}}))
;; Using manifold's transform
(defn hystrix-stream-success2 [subscription req]
(let [ping-result (Object.)
xform (comp (mapcat (fn [x]
(if (= x ping-result)
["ping"]
(metrics->hystrix-data x))))
(map (fn [msg]
(str "\ndata:" (json/encode msg) "\n"))))
;; Return a ping every two seconds when channels are not being filled
_ (a/go-loop []
(a/<! (a/timeout 2000))
(when (a/>! subscription ping-result)
(recur)))]
{:status 200
:body (s/transform xform (s/->source subscription))
:headers {"Content-Type" "text/event-stream;charset=UTF-8"
"Cache-Control" "no-cache, no-store, max-age=0, must-revalidate"
"Pragma" "no-cache"}}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment