Last active
November 4, 2016 15:48
-
-
Save hadronzoo/2ccd5b6de50dfce73d778e22dfa0145c to your computer and use it in GitHub Desktop.
Emit watermark changes — useful for keeping track of Kafka partition log offsets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(let [partition (juxt :topic :partition) | |
raise? #(case (:operation %) | |
:fetch false | |
:ack true | |
(throw (ex-info "unknown operation" %))) | |
level :offset | |
operations [{:topic "test" :partition 0 :operation :fetch :offset 0} | |
{:topic "test" :partition 0 :operation :fetch :offset 1} | |
{:topic "test" :partition 0 :operation :ack :offset 1} | |
{:topic "test" :partition 1 :operation :fetch :offset 0} | |
{:topic "test" :partition 0 :operation :fetch :offset 2} | |
{:topic "test" :partition 0 :operation :ack :offset 2} | |
{:topic "test" :partition 0 :operation :ack :offset 0} | |
{:topic "test" :partition 0 :operation :fetch :offset 4} | |
{:topic "test" :partition 1 :operation :ack :offset 0} | |
{:topic "test" :partition 0 :operation :fetch :offset 3} | |
{:topic "test" :partition 0 :operation :fetch :offset 6} | |
{:topic "test" :partition 0 :operation :fetch :offset 5} | |
{:topic "test" :partition 0 :operation :ack :offset 5} | |
{:topic "test" :partition 0 :operation :ack :offset 4} | |
{:topic "test" :partition 0 :operation :ack :offset 3} | |
{:topic "test" :partition 0 :operation :ack :offset 6}]] | |
(partitioned-watermark partition raise? level operations)) | |
;=> ([["test" 0] 2] [["test" 1] 0] [["test" 0] 5] [["test" 0] 6]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns xf.watermark | |
(:require [clojure.data.int-map :as i] | |
[net.cgrand.xforms :as x])) | |
(defn watermark | |
"Calculates lowest pending level for each item in the collection. Returns a | |
lazy sequence of watermarks for each item that changes the lowest pending | |
level. Takes a raise? function that returns false if the item is pending or | |
true if it has been completed and may raise the watermark. Also takes a | |
level function that returns a long representing the level of the item. | |
Returns a stateful transducer when no collection is provided." | |
([raise? level] | |
(let [pending (volatile! (i/int-set)) | |
completed (volatile! (i/int-set))] | |
(fn [xf] | |
(fn | |
([] (xf)) | |
([result] (xf result)) | |
([result item] | |
(let [l (level item)] | |
(if (raise? item) | |
(let [watermark (or (first (vswap! pending disj l)) | |
Long/MAX_VALUE) | |
flushed (into (i/int-set) (filter #(< % watermark)) | |
(vswap! completed conj l)) | |
new-watermark (last flushed)] | |
(vswap! completed #(i/difference % flushed)) | |
(if new-watermark | |
(xf result new-watermark) | |
result)) | |
(do (vswap! pending conj l) | |
result)))))))) | |
([raise? level coll] | |
(sequence (watermark raise? level) coll))) | |
(defn partitioned-watermark | |
"Like watermark, but takes an additional partition function and calculates | |
the watermark for each partition. Returns a stateful transducer when no | |
collection is provided." | |
([partition raise? level] | |
(x/by-key partition (watermark raise? level))) | |
([partition raise? level coll] | |
(sequence (partitioned-watermark partition raise? level) coll))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment