Skip to content

Instantly share code, notes, and snippets.

@hadronzoo
Last active November 4, 2016 15:48
Show Gist options
  • Save hadronzoo/2ccd5b6de50dfce73d778e22dfa0145c to your computer and use it in GitHub Desktop.
Save hadronzoo/2ccd5b6de50dfce73d778e22dfa0145c to your computer and use it in GitHub Desktop.
Emit watermark changes — useful for keeping track of Kafka partition log offsets
(let [partition (juxt :topic :partition)
raise? #(case (:operation %)
:fetch false
:ack true
(throw (ex-info "unknown operation" %)))
level :offset
operations [{:topic "test" :partition 0 :operation :fetch :offset 0}
{:topic "test" :partition 0 :operation :fetch :offset 1}
{:topic "test" :partition 0 :operation :ack :offset 1}
{:topic "test" :partition 1 :operation :fetch :offset 0}
{:topic "test" :partition 0 :operation :fetch :offset 2}
{:topic "test" :partition 0 :operation :ack :offset 2}
{:topic "test" :partition 0 :operation :ack :offset 0}
{:topic "test" :partition 0 :operation :fetch :offset 4}
{:topic "test" :partition 1 :operation :ack :offset 0}
{:topic "test" :partition 0 :operation :fetch :offset 3}
{:topic "test" :partition 0 :operation :fetch :offset 6}
{:topic "test" :partition 0 :operation :fetch :offset 5}
{:topic "test" :partition 0 :operation :ack :offset 5}
{:topic "test" :partition 0 :operation :ack :offset 4}
{:topic "test" :partition 0 :operation :ack :offset 3}
{:topic "test" :partition 0 :operation :ack :offset 6}]]
(partitioned-watermark partition raise? level operations))
;=> ([["test" 0] 2] [["test" 1] 0] [["test" 0] 5] [["test" 0] 6])
(ns xf.watermark
(:require [clojure.data.int-map :as i]
[net.cgrand.xforms :as x]))
(defn watermark
"Calculates lowest pending level for each item in the collection. Returns a
lazy sequence of watermarks for each item that changes the lowest pending
level. Takes a raise? function that returns false if the item is pending or
true if it has been completed and may raise the watermark. Also takes a
level function that returns a long representing the level of the item.
Returns a stateful transducer when no collection is provided."
([raise? level]
(let [pending (volatile! (i/int-set))
completed (volatile! (i/int-set))]
(fn [xf]
(fn
([] (xf))
([result] (xf result))
([result item]
(let [l (level item)]
(if (raise? item)
(let [watermark (or (first (vswap! pending disj l))
Long/MAX_VALUE)
flushed (into (i/int-set) (filter #(< % watermark))
(vswap! completed conj l))
new-watermark (last flushed)]
(vswap! completed #(i/difference % flushed))
(if new-watermark
(xf result new-watermark)
result))
(do (vswap! pending conj l)
result))))))))
([raise? level coll]
(sequence (watermark raise? level) coll)))
(defn partitioned-watermark
"Like watermark, but takes an additional partition function and calculates
the watermark for each partition. Returns a stateful transducer when no
collection is provided."
([partition raise? level]
(x/by-key partition (watermark raise? level)))
([partition raise? level coll]
(sequence (partitioned-watermark partition raise? level) coll)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment