Created
April 11, 2018 10:01
-
-
Save igrishaev/274c1a4fc6986b1485f6554b1a63d0b2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns ___.kafka | |
"Common Kafka module." | |
(:require [cheshire.core :as json] | |
[clojure.tools.logging :as log]) | |
(:import [org.apache.kafka.clients.consumer KafkaConsumer ConsumerRecord] | |
[org.apache.kafka.clients.producer KafkaProducer ProducerRecord] | |
[org.apache.kafka.common.errors InterruptException] | |
[org.apache.kafka.common.serialization ByteArrayDeserializer ByteArraySerializer])) | |
;; | |
;; Helpers | |
;; | |
(defn as-cfg | |
"Turns a keyword-keyed map into a string-keyed one." | |
[cfg-map] | |
(into {} (for [[k v] cfg-map] [(name k) v]))) | |
(def timeout-poll 1000) | |
;; | |
;; Consumer | |
;; | |
(def ^{:doc "Default `KafkaConsumer` settings."} | |
cfg-consumer | |
{:bootstrap.servers "localhost:9092", | |
:group.id "default" | |
:max.poll.records "10" | |
:max.poll.interval.ms "10000" | |
:auto.offset.reset "earliest", | |
:enable.auto.commit true, | |
:key.deserializer ByteArrayDeserializer | |
:value.deserializer ByteArrayDeserializer}) | |
(defn ^KafkaConsumer consumer-create | |
[& [cfg]] | |
(KafkaConsumer. (-> cfg-consumer (merge cfg) as-cfg))) | |
(defn consumer-subscribe | |
"Subscribes a consumer to a set of topics." | |
[consumer topics & more] | |
(.subscribe consumer topics)) | |
(defn consumer-close | |
"Closes a consumer." | |
[^KafkaConsumer consumer] | |
(.close consumer)) | |
(defn record->map | |
"Turns a Kafka record into a Clojure map." | |
[^ConsumerRecord rec] | |
{:value (-> rec .value String. (json/parse-string true)) | |
:id (.offset rec) | |
:topic (.topic rec)}) | |
(defn consumer-poll | |
[consumer & [timeout]] | |
(let [recs (.poll consumer (or timeout timeout-poll))] | |
(map record->map recs))) | |
(defmacro with-consumer | |
[bind cfg & body] | |
`(let [~bind (consumer-create ~cfg)] | |
(try | |
~@body | |
(finally | |
(consumer-close ~bind))))) | |
;; | |
;; Producer | |
;; | |
(def cfg-producer | |
{:bootstrap.servers "localhost:9092" | |
:value.serializer ByteArraySerializer | |
:key.serializer ByteArraySerializer}) | |
(defn ^KafkaProducer producer-create | |
[& [cfg]] | |
(KafkaProducer. (-> cfg-producer (merge cfg) as-cfg))) | |
(defn producer-close | |
[producer] | |
(.close producer)) | |
(defmacro with-producer | |
[bind cfg & body] | |
`(let [~bind (producer-create ~cfg)] | |
(try | |
~@body | |
(finally | |
(producer-close ~bind))))) | |
(defn producer-send-many | |
[producer topic payloads] | |
(doseq [p payloads] | |
(let [data (-> p json/generate-string .getBytes)] | |
(.send producer (ProducerRecord. topic data))))) | |
(defn producer-send | |
[producer topic payload & payloads] | |
(producer-send-many producer topic (cons payload payloads))) | |
;; | |
;; Workers | |
;; | |
(defn worker-start | |
[cfg topics func] | |
(future | |
(let [consumer (consumer-create cfg)] | |
(consumer-subscribe consumer topics) | |
(with-local-vars [continue? true] | |
(while (var-get continue?) | |
(try | |
(when-let [msgs (not-empty (consumer-poll consumer))] | |
(doseq [msg msgs] | |
(future (func msg)))) | |
(catch InterruptException e | |
(var-set continue? false) | |
(log/info "Worker interrupted, stopping.")) | |
(catch Exception e | |
(log/errorf e "Worker exception, topics: %s" topics)))))))) | |
(defn worker-stop | |
[worker] | |
(future-cancel worker) | |
(future-cancelled? worker)) | |
(defn worker-status | |
[worker] | |
(and | |
(not (future-cancelled? worker)) | |
(not (realized? worker)))) | |
;; | |
;; Usage | |
;; | |
(comment | |
;; producer | |
(def p (producer-create {:group.id 42})) | |
(producer-send p "bar" {:any [:clojure #{:data}]}) | |
(producer-close p) | |
;; consumer | |
(def c (consumer-create)) | |
(consumer-subscribe c ["bar"]) | |
(def msgs (consumer-poll c)) | |
(consumer-close c) | |
;; auto-close macroses | |
(with-producer p cfg-producer (producer-send p "foo" {:test [1 2 999]})) | |
(with-consumer c cfg-default (consumer-subscribe c ["foo"]) (consumer-poll c)) | |
;; workers | |
(def w (worker-start cfg-consumer "foo" println)) | |
(worker-stop w)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment