Skip to content

Instantly share code, notes, and snippets.

@xfthhxk
Last active November 25, 2024 12:05
Show Gist options
  • Save xfthhxk/ce719af590c01712d8cf67ca592527fb to your computer and use it in GitHub Desktop.
Save xfthhxk/ce719af590c01712d8cf67ca592527fb to your computer and use it in GitHub Desktop.
Clojure telemere handler for buffering signals and periodically draining to a sink
(defn handler:buffered-sink-handler
"Buffers signals up to `buffer/size` and invokes
`:buffer/signals-handler` when buffer is full or
`:buffer/drain-interval-millis` has elapsed.
`:buffer/signal-dropped` is a 1 arity function that receives
the signal that was dropped because the buffer was full.
The `sink` is a 1 arity function that receives
a coll of telemere signals."
([] (handler:buffered-sink-handler nil))
([{:keys [buffer/drain-interval-millis
buffer/size
buffer/sink
^String buffer/thread-name
buffer/signal-dropped]
:or {size 1024
drain-interval-millis 5000
thread-name (str (gensym "buffered-sink-handler-thread-"))
signal-dropped (fn [_] (println "buffered-sink-handler: The buffer is full. signal was dropped."))}
:as construct-opts}]
(when-not sink
(throw (ex-info "`:buffer/sink` is required" construct-opts)))
(let [q (java.util.concurrent.ArrayBlockingQueue/new size true)
flush! (fn []
(let [signals (java.util.ArrayList/new ^int size)]
(.drainTo q signals)
(when-not (.isEmpty signals)
(try
(sink (vec signals))
(catch Throwable t
(pr-str "Uncaught exception in buffered-sink-handler: " t))))))
thread-fn (fn []
(while (->> (Thread/currentThread) .isInterrupted not)
(try
(flush!)
(Thread/sleep ^long drain-interval-millis)
(catch InterruptedException _
(println "[" thread-name "] thread interrupted")))))
^Thread thread (-> (Thread/ofVirtual)
(.name thread-name)
(.start thread-fn))]
(fn buffered-handler
([]
(println "buffered-sink-handler: initiate shutdown")
(flush!)
(.interrupt thread)
(println "buffered-sink-handler: flushed signals buffer"))
([signal]
;; try twice and then drop if buffer is full
(or (.offer q signal)
(do
(flush!)
(when-not (.offer q signal)
(signal-dropped signal)))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment