Created
October 2, 2013 06:56
-
-
Save halfelf/6789945 to your computer and use it in GitHub Desktop.
Simple Kafka Consumer in Clojure
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns oceanus.anduin.clj.consumer | |
(:gen-class) | |
(:import [kafka.consumer ConsumerConfig Consumer KafkaStream] | |
[kafka.javaapi.consumer ConsumerConnector] | |
[java.util Properties])) | |
(defn make-props | |
"convert a clojure map into a Properties object." | |
[m] | |
(let [props (Properties.)] | |
(doseq [[k v] m] | |
(.put props k (str v))) | |
props)) | |
(defn get-streams-map [conf topics] | |
(-> conf make-props ConsumerConfig. | |
Consumer/createJavaConsumerConnector | |
(.createMessageStreams topics))) | |
(defn get-streams [props topic total-partitions] | |
(for [i (range total-partitions)] | |
(-> props | |
(get-streams-map {topic (int 1)}) | |
(.get topic) | |
first))) | |
(defn get-one-stream [props topic] | |
(-> props | |
(get-streams-map {topic (int 1)}) | |
(.get topic) | |
first)) | |
(defn count-events [props topic total-partitions] | |
"example of usage" | |
(let [counter (atom 0)] | |
(doseq [stream (get-streams props topic total-partitions)] | |
(.start | |
(Thread. | |
#(doseq [m stream] | |
(swap! counter inc))))) | |
(Thread/sleep 15000) | |
@counter)) | |
(comment | |
{"zookeeper.connect" "localhost:2181" | |
"zk.connectiontimeout.ms" 1000000 | |
"group.id" "group1", | |
"fetch.size" 2097152, | |
"socket.receive.buffer.bytes" 65536, | |
"auto.commit.interval.ms" 60000, | |
"queued.max.messages" 10} | |
) ; A property example |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment