Last active
January 8, 2019 20:25
-
-
Save tristanstraub/e5f50698d04aa3cc3bac6baf71943cae to your computer and use it in GitHub Desktop.
kafka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns watermarks) | |
(defn partition-of | |
[partitions key] | |
(mod (.hashCode "test") partitions)) | |
(def partitions 12) | |
(defn produce! | |
[broker topic key message] | |
(swap! broker update-in | |
[:topics topic :data (partition-of partitions | |
key)] | |
(fnil conj []) message)) | |
(defn consume! | |
[broker consumer topic partition] | |
(let [offset (get-in @broker [:consumers consumer topic partition] 0) | |
message (get-in @broker [:topics topic :data partition offset])] | |
(when message | |
(swap! broker update-in | |
[:consumers consumer topic partition] (fnil inc 0))) | |
message)) | |
(comment | |
:coordinator/received-all-partitions-of-frame "To each active key of inbound" | |
:simulator/key-sent-all-outbound "To all active key of outbound" | |
) | |
(defn simulate! | |
[broker simulator] | |
(->> (doall (map #(consume! broker "simulator" "objects" %) (range partitions))) | |
(group-by :timestamp) | |
)) | |
(defn coordinate! | |
[broker coordinator] | |
) | |
(let [broker (atom nil) | |
simulator (atom {:topic "objects"}) | |
coordinator (atom nil)] | |
(produce! broker "test" "one" "two") | |
(produce! broker "test" "one" "three") | |
(consume! broker "c" "test" 10) | |
(consume! broker "c" "test" 10)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment