Last active
February 21, 2025 19:55
-
-
Save mkrcah/1fdc0bee690a7a75b35f4429785f2ba6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns fiddle.core-async-sse | |
(:require [clojure.core.async :as async] | |
[clojure.java.io :as io] | |
[reitit.ring :as ring] | |
[ring.adapter.jetty :as jetty] | |
[ring.core.protocols :refer [StreamableResponseBody]]) | |
(:import (clojure.core.async.impl.channels ManyToManyChannel))) | |
; convert a channel to an output stream | |
(defn channel->output-stream [channel output-stream] | |
(with-open [out output-stream | |
writer (io/writer out)] | |
(loop [] | |
(when-let [^String msg (async/<!! channel)] | |
(doto writer (.write msg) (.flush)) | |
(recur))))) | |
; teaching Ring to convert a channel to a response stream | |
(extend-type ManyToManyChannel | |
StreamableResponseBody | |
(write-body-to-stream [ch _response output-stream] | |
(channel->output-stream ch output-stream))) | |
; sync Ring handler | |
(defn sse-handler [sse-chan] | |
(fn [_request] | |
(println "Connected to SSE endpoint") | |
{:status 200, | |
:headers {"Content-Type" "text/event-stream" | |
"Cache-Control" "no-cache, no-store"}, | |
:body sse-chan})) | |
(defn app [sse-chanel] | |
(ring/ring-handler | |
(ring/router | |
[["/" {:get {:handler (sse-handler sse-chanel)}}] | |
["/status" {:get {:handler (constantly {:status 200})}}]]))) | |
(defn some-sse-message [t] | |
(format "event: hello\ndata: current time is %s\n\n" t)) | |
(defn produce-random-sse-events! [ch] | |
(async/thread | |
(loop [] | |
(Thread/sleep 2000) | |
(let [msg (some-sse-message (System/currentTimeMillis))] | |
(when (async/>!! ch msg) | |
(println msg))) | |
(recur)))) | |
(comment | |
(do | |
(def my-sse-channel (async/chan 10)) | |
(def server (jetty/run-jetty (app my-sse-channel) {:port 3001 :join? false})) | |
(produce-random-sse-events! my-sse-channel)) | |
(when server (.stop server)) | |
(async/close! my-sse-channel)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns fiddle.core-async-sse | |
(:require [clojure.core.async :as async] | |
[clojure.java.io :as io] | |
[reitit.ring :as ring] | |
[ring.adapter.jetty :as jetty] | |
[ring.core.protocols :refer [StreamableResponseBody]]) | |
(:import (clojure.core.async.impl.channels ManyToManyChannel) | |
(java.io IOException))) | |
;; ------------------------------------------- | |
;; Clients | |
;; ------------------------------------------ | |
(defonce sse-connections (atom #{})) | |
(defn add-connection! [conn] | |
(let [chan (async/chan 10) | |
client' (assoc conn :channel chan)] | |
(println "SSE connection added") | |
(swap! sse-connections conj client') | |
client')) | |
(defn remove-connection! [conn] | |
(println "SSE connection removed") | |
(swap! sse-connections disj conn)) | |
(defn list-connections [] | |
@sse-connections) | |
;; ------------------------------------------- | |
;; Ring handler | |
;; ------------------------------------------ | |
; convert a channel to an output stream | |
(defn channel->output-stream [channel output-stream] | |
(with-open [out output-stream | |
writer (io/writer out)] | |
(try | |
(loop [] | |
(println "Blocking on channel" channel) | |
(when-let [msg (async/<!! channel)] | |
(doto writer (.write msg) (.flush)) | |
(recur))) | |
(catch IOException _) ;; Ring/Jetty throws here on disconnected client | |
(finally | |
(async/close! channel))))) | |
; teaching Ring to convert a channel to a response stream | |
(extend-type ManyToManyChannel | |
StreamableResponseBody | |
(write-body-to-stream [ch _response output-stream] | |
(channel->output-stream ch output-stream))) | |
; sync Ring handler | |
(defn sse-handler [] | |
(fn [ring-request] | |
(let [conn (add-connection! {:request ring-request}) | |
channel (:channel conn)] | |
{:status 200, | |
:headers {"Content-Type" "text/event-stream" | |
"Cache-Control" "no-cache, no-store"}, | |
:body channel}))) | |
(defn app [] | |
(ring/ring-handler | |
(ring/router | |
[["/" {:get {:handler (sse-handler)}}] | |
["/status" {:get {:handler (constantly {:status 200})}}]]))) | |
;; ------------------------------------------- | |
;; Broadcasting | |
;; ------------------------------------------ | |
(defn some-sse-message [t] | |
(format "event: hello\ndata: current time is %s\n\n" t)) | |
(defn broadcast [msg] | |
(doseq [client @sse-connections] | |
(let [sent? (async/>!! (:channel client) msg)] | |
(if sent? | |
(println "message sent") | |
(remove-connection! client))))) | |
(comment | |
(when server (.stop server)) | |
(def server (jetty/run-jetty (app) {:port 3001 :join? false})) | |
(count (list-connections)) | |
(broadcast (some-sse-message (System/currentTimeMillis)))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment