Skip to content

Instantly share code, notes, and snippets.

@igrishaev
Created April 11, 2018 10:01
Show Gist options
  • Save igrishaev/274c1a4fc6986b1485f6554b1a63d0b2 to your computer and use it in GitHub Desktop.
Save igrishaev/274c1a4fc6986b1485f6554b1a63d0b2 to your computer and use it in GitHub Desktop.
(ns ___.kafka
"Common Kafka module."
(:require [cheshire.core :as json]
[clojure.tools.logging :as log])
(:import [org.apache.kafka.clients.consumer KafkaConsumer ConsumerRecord]
[org.apache.kafka.clients.producer KafkaProducer ProducerRecord]
[org.apache.kafka.common.errors InterruptException]
[org.apache.kafka.common.serialization ByteArrayDeserializer ByteArraySerializer]))
;;
;; Helpers
;;
(defn as-cfg
"Turns a keyword-keyed map into a string-keyed one."
[cfg-map]
(into {} (for [[k v] cfg-map] [(name k) v])))
(def timeout-poll 1000)
;;
;; Consumer
;;
(def ^{:doc "Default `KafkaConsumer` settings."}
cfg-consumer
{:bootstrap.servers "localhost:9092",
:group.id "default"
:max.poll.records "10"
:max.poll.interval.ms "10000"
:auto.offset.reset "earliest",
:enable.auto.commit true,
:key.deserializer ByteArrayDeserializer
:value.deserializer ByteArrayDeserializer})
(defn ^KafkaConsumer consumer-create
[& [cfg]]
(KafkaConsumer. (-> cfg-consumer (merge cfg) as-cfg)))
(defn consumer-subscribe
"Subscribes a consumer to a set of topics."
[consumer topics & more]
(.subscribe consumer topics))
(defn consumer-close
"Closes a consumer."
[^KafkaConsumer consumer]
(.close consumer))
(defn record->map
"Turns a Kafka record into a Clojure map."
[^ConsumerRecord rec]
{:value (-> rec .value String. (json/parse-string true))
:id (.offset rec)
:topic (.topic rec)})
(defn consumer-poll
[consumer & [timeout]]
(let [recs (.poll consumer (or timeout timeout-poll))]
(map record->map recs)))
(defmacro with-consumer
[bind cfg & body]
`(let [~bind (consumer-create ~cfg)]
(try
~@body
(finally
(consumer-close ~bind)))))
;;
;; Producer
;;
(def cfg-producer
{:bootstrap.servers "localhost:9092"
:value.serializer ByteArraySerializer
:key.serializer ByteArraySerializer})
(defn ^KafkaProducer producer-create
[& [cfg]]
(KafkaProducer. (-> cfg-producer (merge cfg) as-cfg)))
(defn producer-close
[producer]
(.close producer))
(defmacro with-producer
[bind cfg & body]
`(let [~bind (producer-create ~cfg)]
(try
~@body
(finally
(producer-close ~bind)))))
(defn producer-send-many
[producer topic payloads]
(doseq [p payloads]
(let [data (-> p json/generate-string .getBytes)]
(.send producer (ProducerRecord. topic data)))))
(defn producer-send
[producer topic payload & payloads]
(producer-send-many producer topic (cons payload payloads)))
;;
;; Workers
;;
(defn worker-start
[cfg topics func]
(future
(let [consumer (consumer-create cfg)]
(consumer-subscribe consumer topics)
(with-local-vars [continue? true]
(while (var-get continue?)
(try
(when-let [msgs (not-empty (consumer-poll consumer))]
(doseq [msg msgs]
(future (func msg))))
(catch InterruptException e
(var-set continue? false)
(log/info "Worker interrupted, stopping."))
(catch Exception e
(log/errorf e "Worker exception, topics: %s" topics))))))))
(defn worker-stop
[worker]
(future-cancel worker)
(future-cancelled? worker))
(defn worker-status
[worker]
(and
(not (future-cancelled? worker))
(not (realized? worker))))
;;
;; Usage
;;
(comment
;; producer
(def p (producer-create {:group.id 42}))
(producer-send p "bar" {:any [:clojure #{:data}]})
(producer-close p)
;; consumer
(def c (consumer-create))
(consumer-subscribe c ["bar"])
(def msgs (consumer-poll c))
(consumer-close c)
;; auto-close macroses
(with-producer p cfg-producer (producer-send p "foo" {:test [1 2 999]}))
(with-consumer c cfg-default (consumer-subscribe c ["foo"]) (consumer-poll c))
;; workers
(def w (worker-start cfg-consumer "foo" println))
(worker-stop w))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment