Last active
May 28, 2016 05:58
-
-
Save jeroenvandijk/67d064e0bb08b900e656 to your computer and use it in GitHub Desktop.
Server Side Events with Aleph
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(comment | |
;; Deps | |
[cheshire "5.4.0"] | |
[manifold "0.1.1"] | |
[aleph "0.4.1-beta2"] | |
[org.clojure/clojure "1.7.0"] | |
[org.clojure/core.async "0.2.371"] | |
) | |
(require '[clojure.core.async :as a] | |
'[manifold.stream :as s] | |
'[manifold.stream :as s]) | |
;; With the code below streams and channels are not closed after a client closes the connection | |
(defn hystrix-stream [subscription req] | |
(let [subscription-with-ping (a/chan) | |
ping-result (Object.) | |
;; Return a ping every two seconds when channels are not being filled | |
_ (a/go-loop [] | |
(let [timeout-ch (a/timeout 2000) | |
[v ch] (a/alts! [subscription timeout-ch]) | |
res (if (= ch timeout-ch) | |
ping-result | |
v)] | |
(when res | |
(a/>! subscription-with-ping res) | |
(recur)))) | |
s (s/->source subscription-with-ping) | |
strm (->> s | |
(s/mapcat (fn [x] | |
(if (= x ping-result) | |
["ping"] | |
(metrics->hystrix-data x)))) | |
(s/map (fn [msg] | |
(str "\ndata:" (json/encode msg) "\n"))))] | |
{:status 200 :body strm :headers {"Content-Type" "text/event-stream;charset=UTF-8" | |
"Cache-Control" "no-cache, no-store, max-age=0, must-revalidate" | |
"Pragma" "no-cache"}})) | |
;; Alternative below should stop when channel is closed, but this doesn't happen | |
(defn hystrix-stream-alt [subscription req] | |
(let [ping-result (Object.) | |
;; Return a ping every two seconds when channels are not being filled | |
_ (a/go-loop [] | |
(a/<! (a/timeout 2000)) | |
(if (a/>! subscription ping-result) | |
(recur) | |
(println "I was closed!!"))) | |
s (s/->source subscription) | |
strm (->> s | |
(s/mapcat (fn [x] | |
(if (= x ping-result) | |
["ping"] | |
(metrics->hystrix-data x)))) | |
(s/map (fn [msg] | |
(str "\ndata:" (json/encode msg) "\n"))))] | |
{:status 200 :body strm :headers {"Content-Type" "text/event-stream;charset=UTF-8" | |
"Cache-Control" "no-cache, no-store, max-age=0, must-revalidate" | |
"Pragma" "no-cache"}})) | |
;; When we return a source channel as response the closing does happen properly :) | |
(defn hystrix-stream-correct [subscription req] | |
(let [ping-result (Object.) | |
;; Return a ping every two seconds when channels are not being filled | |
xform (comp (mapcat (fn [x] | |
(if (= x ping-result) | |
["ping"] | |
(metrics->hystrix-data x)))) | |
(map (fn [msg] | |
(str "\ndata:" (json/encode msg) "\n")))) | |
formatted-channel (a/chan 1 xform) | |
_ (a/go-loop [] | |
(a/<! (a/timeout 2000)) | |
(if (a/>! formatted-channel ping-result) | |
(recur) | |
(println "I was closed!!"))) | |
;; Let core.async do the transformation so the closing happens correctly | |
_ (a/pipe subscription formatted-channel)] | |
{:status 200 | |
:body (s/->source formatted-channel) | |
:headers {"Content-Type" "text/event-stream;charset=UTF-8" | |
"Cache-Control" "no-cache, no-store, max-age=0, must-revalidate" | |
"Pragma" "no-cache"}})) | |
;; Using manifold's transform | |
(defn hystrix-stream-success2 [subscription req] | |
(let [ping-result (Object.) | |
xform (comp (mapcat (fn [x] | |
(if (= x ping-result) | |
["ping"] | |
(metrics->hystrix-data x)))) | |
(map (fn [msg] | |
(str "\ndata:" (json/encode msg) "\n")))) | |
;; Return a ping every two seconds when channels are not being filled | |
_ (a/go-loop [] | |
(a/<! (a/timeout 2000)) | |
(when (a/>! subscription ping-result) | |
(recur)))] | |
{:status 200 | |
:body (s/transform xform (s/->source subscription)) | |
:headers {"Content-Type" "text/event-stream;charset=UTF-8" | |
"Cache-Control" "no-cache, no-store, max-age=0, must-revalidate" | |
"Pragma" "no-cache"}})) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment