Skip to content

Instantly share code, notes, and snippets.

@mkrcah
Last active February 21, 2025 19:55
Show Gist options
  • Save mkrcah/1fdc0bee690a7a75b35f4429785f2ba6 to your computer and use it in GitHub Desktop.
Save mkrcah/1fdc0bee690a7a75b35f4429785f2ba6 to your computer and use it in GitHub Desktop.
(ns fiddle.core-async-sse
(:require [clojure.core.async :as async]
[clojure.java.io :as io]
[reitit.ring :as ring]
[ring.adapter.jetty :as jetty]
[ring.core.protocols :refer [StreamableResponseBody]])
(:import (clojure.core.async.impl.channels ManyToManyChannel)))
; convert a channel to an output stream
(defn channel->output-stream [channel output-stream]
(with-open [out output-stream
writer (io/writer out)]
(loop []
(when-let [^String msg (async/<!! channel)]
(doto writer (.write msg) (.flush))
(recur)))))
; teaching Ring to convert a channel to a response stream
(extend-type ManyToManyChannel
StreamableResponseBody
(write-body-to-stream [ch _response output-stream]
(channel->output-stream ch output-stream)))
; sync Ring handler
(defn sse-handler [sse-chan]
(fn [_request]
(println "Connected to SSE endpoint")
{:status 200,
:headers {"Content-Type" "text/event-stream"
"Cache-Control" "no-cache, no-store"},
:body sse-chan}))
(defn app [sse-chanel]
(ring/ring-handler
(ring/router
[["/" {:get {:handler (sse-handler sse-chanel)}}]
["/status" {:get {:handler (constantly {:status 200})}}]])))
(defn some-sse-message [t]
(format "event: hello\ndata: current time is %s\n\n" t))
(defn produce-random-sse-events! [ch]
(async/thread
(loop []
(Thread/sleep 2000)
(let [msg (some-sse-message (System/currentTimeMillis))]
(when (async/>!! ch msg)
(println msg)))
(recur))))
(comment
(do
(def my-sse-channel (async/chan 10))
(def server (jetty/run-jetty (app my-sse-channel) {:port 3001 :join? false}))
(produce-random-sse-events! my-sse-channel))
(when server (.stop server))
(async/close! my-sse-channel))
(ns fiddle.core-async-sse
(:require [clojure.core.async :as async]
[clojure.java.io :as io]
[reitit.ring :as ring]
[ring.adapter.jetty :as jetty]
[ring.core.protocols :refer [StreamableResponseBody]])
(:import (clojure.core.async.impl.channels ManyToManyChannel)
(java.io IOException)))
;; -------------------------------------------
;; Clients
;; ------------------------------------------
(defonce sse-connections (atom #{}))
(defn add-connection! [conn]
(let [chan (async/chan 10)
client' (assoc conn :channel chan)]
(println "SSE connection added")
(swap! sse-connections conj client')
client'))
(defn remove-connection! [conn]
(println "SSE connection removed")
(swap! sse-connections disj conn))
(defn list-connections []
@sse-connections)
;; -------------------------------------------
;; Ring handler
;; ------------------------------------------
; convert a channel to an output stream
(defn channel->output-stream [channel output-stream]
(with-open [out output-stream
writer (io/writer out)]
(try
(loop []
(println "Blocking on channel" channel)
(when-let [msg (async/<!! channel)]
(doto writer (.write msg) (.flush))
(recur)))
(catch IOException _) ;; Ring/Jetty throws here on disconnected client
(finally
(async/close! channel)))))
; teaching Ring to convert a channel to a response stream
(extend-type ManyToManyChannel
StreamableResponseBody
(write-body-to-stream [ch _response output-stream]
(channel->output-stream ch output-stream)))
; sync Ring handler
(defn sse-handler []
(fn [ring-request]
(let [conn (add-connection! {:request ring-request})
channel (:channel conn)]
{:status 200,
:headers {"Content-Type" "text/event-stream"
"Cache-Control" "no-cache, no-store"},
:body channel})))
(defn app []
(ring/ring-handler
(ring/router
[["/" {:get {:handler (sse-handler)}}]
["/status" {:get {:handler (constantly {:status 200})}}]])))
;; -------------------------------------------
;; Broadcasting
;; ------------------------------------------
(defn some-sse-message [t]
(format "event: hello\ndata: current time is %s\n\n" t))
(defn broadcast [msg]
(doseq [client @sse-connections]
(let [sent? (async/>!! (:channel client) msg)]
(if sent?
(println "message sent")
(remove-connection! client)))))
(comment
(when server (.stop server))
(def server (jetty/run-jetty (app) {:port 3001 :join? false}))
(count (list-connections))
(broadcast (some-sse-message (System/currentTimeMillis))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment