Last active
April 8, 2023 17:36
-
-
Save tolitius/cc968a2adcc9dccc24cf15386fc44345 to your computer and use it in GitHub Desktop.
event listener with multiple kafka consumer threads
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(deftype BlahBlahThreadFactory [name ^AtomicInteger thread-counter] | |
ThreadFactory | |
(newThread [_ r] | |
(doto | |
(Thread. r) | |
(.setName (format "%s-%d" name (.getAndIncrement thread-counter))) | |
(.setDaemon true) | |
(.setUncaughtExceptionHandler | |
(reify Thread$UncaughtExceptionHandler | |
(uncaughtException [_ thread ex] | |
(error (format "Error in thread id: %s name: %s" (.getId thread) (.getName thread)) ex))))))) | |
(defn new-executor | |
([] (new-executor 1)) | |
([num-threads] | |
(Executors/newFixedThreadPool num-threads | |
(BlahBlahThreadFactory. "blah-blah-runner" | |
(AtomicInteger. 0))))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn consume [consumer process running? ms n] | |
(info "starting" (inc n) "consumer") | |
(while @running? | |
(try | |
(let [consumer-records (gregor/poll consumer ms)] | |
(process consumer consumer-records) | |
(gregor/commit-offsets! consumer)) | |
(catch Throwable t | |
(error "kafka: could not consume a message" t)))) | |
(gregor/close consumer)) | |
(defn consumer [conf] | |
(->> (edn-to-consumer conf) | |
(apply gregor/consumer))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn run-consumers [process {:keys [threads poll-ms] :as conf}] | |
(let [running? (atom true) | |
pool (new-executor (if (number? threads) | |
threads | |
42))] | |
(dotimes [t threads] | |
(let [c (consumer (dissoc conf :threads :poll-ms))] | |
(info "subscribing to:" (gregor/subscription c)) | |
(.submit pool #(consume c process running? poll-ms t)))) | |
(info "started" threads "consumers ->" conf) | |
{:pool pool :running? running?})) | |
(defn stop-consumers [{:keys [pool running? consumers]}] | |
(reset! running? false) | |
(.shutdownNow pool)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn process [c batch] | |
(let [bsize (count batch)] | |
(when (pos? bsize) | |
(info "received" bsize "events") | |
(doseq [event batch] | |
(->> (:value event) | |
;; your logic here <<<<<< | |
))))) | |
(defstate event-listener :start (run-consumers process | |
{... your config}) | |
:stop (stop-consumers event-listener)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I am trying to run kafka single threaded consumer. Having issues with it. I have tried the following