Last active
September 19, 2024 14:37
-
-
Save oliyh/2b9b9107e7e7e12d4a60e79a19d056ee to your computer and use it in GitHub Desktop.
Clojure client for Server Sent Events (SSE)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(require '[clj-http.client :as http]) | |
(require '[clojure.core.async :as a]) | |
(require '[clojure.string :as string]) | |
(require '[clojure.java.io :as io]) | |
(import '[java.io InputStream]) | |
(def event-mask (re-pattern (str "(?s).+?\r\n\r\n"))) | |
(defn- parse-event [raw-event] | |
(->> (re-seq #"(.*): (.*)\n?" raw-event) | |
(map #(drop 1 %)) | |
(group-by first) | |
(reduce (fn [acc [k v]] | |
(assoc acc (keyword k) (string/join (map second v)))) {}))) | |
(defn connect [url & [params]] | |
(let [event-stream ^InputStream (:body (http/get url (merge params {:as :stream}))) | |
events (a/chan (a/sliding-buffer 10) (map parse-event))] | |
(a/thread | |
(loop [data nil] | |
(let [byte-array (byte-array (max 1 (.available event-stream))) | |
bytes-read (.read event-stream byte-array)] | |
(if (neg? bytes-read) | |
(do (println "Input stream closed, exiting read-loop") | |
(.close event-stream)) | |
(let [data (str data (slurp byte-array))] | |
(if-let [es (not-empty (re-seq event-mask data))] | |
(if (every? true? (map #(a/>!! events %) es)) | |
(recur (string/replace data event-mask "")) | |
(do (println "Output stream closed, exiting read-loop") | |
(.close event-stream))) | |
(recur data))))))) | |
events)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment