-
-
Save yayitswei/9362dd0562c69877a798aafb9f778e7e to your computer and use it in GitHub Desktop.
Clojure client for Server Sent Events (SSE)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(require '[clj-http.client :as http]) | |
(require '[clojure.core.async :as a]) | |
(require '[clojure.string :as string]) | |
(require '[clojure.java.io :as io]) | |
(import '[java.io InputStream]) | |
(def event-mask (re-pattern (str "(?s).+?\r\n\r\n"))) | |
(defn- parse-event [raw-event] | |
(->> (re-seq #"(.*): (.*)\n?" raw-event) | |
(map #(drop 1 %)) | |
(group-by first) | |
(reduce (fn [acc [k v]] | |
(assoc acc (keyword k) (string/join (map second v)))) {}))) | |
(defn connect [url & [params]] | |
(let [event-stream ^InputStream (:body (http/get url (merge params {:as :stream}))) | |
events (a/chan (a/sliding-buffer 10) (map parse-event))] | |
(a/thread | |
(loop [data nil] | |
(let [byte-array (byte-array (max 1 (.available event-stream))) | |
bytes-read (.read event-stream byte-array)] | |
(if (neg? bytes-read) | |
(do (println "Input stream closed, exiting read-loop") | |
(.close event-stream)) | |
(let [data (str data (slurp byte-array))] | |
(if-let [es (not-empty (re-seq event-mask data))] | |
(if (every? true? (map #(a/>!! events %) es)) | |
(recur (string/replace data event-mask "")) | |
(do (println "Output stream closed, exiting read-loop") | |
(.close event-stream))) | |
(recur data))))))) | |
events)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment