Last active
December 2, 2015 04:44
-
-
Save teaforthecat/da1c28913b9fceb9305e to your computer and use it in GitHub Desktop.
trying to fetch data from kafka, not being able to use "take"
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (defn consumer-config [] {"zookeeper.connect" "localhost:2182" | |
| "group.id" (str (java.util.UUID/randomUUID)) | |
| "auto.offset.reset" "smallest" | |
| "auto.commit.enable" "false"}) | |
| (defn fetch-first [topic] | |
| (with-resource [c (zk/consumer (consumer-config))] | |
| zk/shutdown | |
| (first (zk/messages c topic)))) | |
| (defn always-nil [topic batch-size] | |
| (with-resource [c (zk/consumer (consumer-config))] | |
| zk/shutdown | |
| (take batch-size (zk/messages c topic)))) | |
| (fetch-first "test") ;=> #clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0xcacb0b3 "[B@cacb0b3"]} | |
| (always-nil "test" 1);=> () |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Notice how every call gets a new "group.id" due to
consumer-configbeing a function (for testing).(firstalways returns offset: 0 with a message. Shouldn't(take 1also return the first message ?