Last active
November 25, 2024 12:05
-
-
Save xfthhxk/ce719af590c01712d8cf67ca592527fb to your computer and use it in GitHub Desktop.
Clojure telemere handler for buffering signals and periodically draining to a sink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn handler:buffered-sink-handler | |
"Buffers signals up to `buffer/size` and invokes | |
`:buffer/signals-handler` when buffer is full or | |
`:buffer/drain-interval-millis` has elapsed. | |
`:buffer/signal-dropped` is a 1 arity function that receives | |
the signal that was dropped because the buffer was full. | |
The `sink` is a 1 arity function that receives | |
a coll of telemere signals." | |
([] (handler:buffered-sink-handler nil)) | |
([{:keys [buffer/drain-interval-millis | |
buffer/size | |
buffer/sink | |
^String buffer/thread-name | |
buffer/signal-dropped] | |
:or {size 1024 | |
drain-interval-millis 5000 | |
thread-name (str (gensym "buffered-sink-handler-thread-")) | |
signal-dropped (fn [_] (println "buffered-sink-handler: The buffer is full. signal was dropped."))} | |
:as construct-opts}] | |
(when-not sink | |
(throw (ex-info "`:buffer/sink` is required" construct-opts))) | |
(let [q (java.util.concurrent.ArrayBlockingQueue/new size true) | |
flush! (fn [] | |
(let [signals (java.util.ArrayList/new ^int size)] | |
(.drainTo q signals) | |
(when-not (.isEmpty signals) | |
(try | |
(sink (vec signals)) | |
(catch Throwable t | |
(pr-str "Uncaught exception in buffered-sink-handler: " t)))))) | |
thread-fn (fn [] | |
(while (->> (Thread/currentThread) .isInterrupted not) | |
(try | |
(flush!) | |
(Thread/sleep ^long drain-interval-millis) | |
(catch InterruptedException _ | |
(println "[" thread-name "] thread interrupted"))))) | |
^Thread thread (-> (Thread/ofVirtual) | |
(.name thread-name) | |
(.start thread-fn))] | |
(fn buffered-handler | |
([] | |
(println "buffered-sink-handler: initiate shutdown") | |
(flush!) | |
(.interrupt thread) | |
(println "buffered-sink-handler: flushed signals buffer")) | |
([signal] | |
;; try twice and then drop if buffer is full | |
(or (.offer q signal) | |
(do | |
(flush!) | |
(when-not (.offer q signal) | |
(signal-dropped signal))))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment