Skip to content

Instantly share code, notes, and snippets.

@teaforthecat
Last active December 2, 2015 04:44
Show Gist options
  • Select an option

  • Save teaforthecat/da1c28913b9fceb9305e to your computer and use it in GitHub Desktop.

Select an option

Save teaforthecat/da1c28913b9fceb9305e to your computer and use it in GitHub Desktop.
trying to fetch data from kafka, not being able to use "take"
(defn consumer-config [] {"zookeeper.connect" "localhost:2182"
"group.id" (str (java.util.UUID/randomUUID))
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})
(defn fetch-first [topic]
(with-resource [c (zk/consumer (consumer-config))]
zk/shutdown
(first (zk/messages c topic))))
(defn always-nil [topic batch-size]
(with-resource [c (zk/consumer (consumer-config))]
zk/shutdown
(take batch-size (zk/messages c topic))))
(fetch-first "test") ;=> #clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0xcacb0b3 "[B@cacb0b3"]}
(always-nil "test" 1);=> ()
@teaforthecat
Copy link
Copy Markdown
Author

Notice how every call gets a new "group.id" due to consumer-config being a function (for testing). (first always returns offset: 0 with a message. Shouldn't (take 1 also return the first message ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment