Skip to content

Instantly share code, notes, and snippets.

@x
Last active August 29, 2015 14:09
Show Gist options
  • Save x/b89172e7304e40bb4484 to your computer and use it in GitHub Desktop.
Save x/b89172e7304e40bb4484 to your computer and use it in GitHub Desktop.
clojure kafka consumer
(ns test-kafka.core
(:gen-class)
(:require [clj-kafka.consumer.zk :refer :all]
[clj-kafka.core :refer :all]
[msgpack.core :refer :all]))
(def config {"zookeeper.connect" "zk04/cb/kafka/pingqueue"
"group.id" "clj-kafka.consumer"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})
(def pings-topic "pings_msgpack_test")
;(def initial-clicks-topic "init-clicks")
(defn -main
"Grab messages from the pingqueue and "
[& args]
(doseq [msg (messages (consumer config) pings-topic)]
(let [topic-parition (get msg :partition)
offset (get msg :offset)
ping (unpack (get msg :value))]
(println topic-parition offset ping))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment