Created
May 2, 2017 18:20
-
-
Save favila/26ca5965a4a94b5f34483c0bb521fa88 to your computer and use it in GitHub Desktop.
code to fiddle with kafka behaviors
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns kafka-workbench | |
(:require [franzy.clients.consumer.protocols :as c] | |
[franzy.clients.producer.protocols :as p] | |
[franzy.serialization.serializers :as serializers] | |
[franzy.serialization.deserializers :as deserializers] | |
[franzy.clients.consumer.client :as consumer] | |
[franzy.clients.producer.client :as producer])) | |
(def kafka-brokers []) | |
(def producer-config | |
{:bootstrap.servers kafka-brokers | |
:acks "all" | |
:retries 0 | |
:batch.size 16384 | |
:linger.ms 10 | |
:buffer.memory 33554432}) | |
(def consumer-config | |
{:bootstrap.servers kafka-brokers | |
:group.id "test-retries" | |
:enable.auto.commit false | |
:auto.offset.reset :none}) | |
(defn kafka-put [pc topic value] | |
(with-open [p (producer/make-producer pc | |
(serializers/string-serializer) | |
(serializers/string-serializer))] | |
(p/send-sync! p topic 0 "key" value {}))) | |
(defn kafka-get-via-assign [cc topic] | |
(with-open [c (consumer/make-consumer cc | |
(serializers/string-serializer) | |
(serializers/string-serializer))] | |
(c/assign-partitions! c [{:topic topic :partition 0}]) | |
(run! (fn [{:keys [topic partition offset value]}] | |
(println (format "%s %s %s %s" | |
topic partition offset value))) | |
(c/poll! c)))) | |
(defn kafka-get-via-subscribe [cc topic] | |
(with-open [c (consumer/make-consumer cc | |
(deserializers/string-deserializer) | |
(deserializers/string-deserializer))] | |
(c/subscribe-to-partitions! c [{:topic topic :partition 0}]) | |
(run! (fn [{:keys [topic partition offset value]}] | |
(println (format "%s %s %s %s" | |
topic partition offset value))) | |
(c/poll! c)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment