Last active
April 12, 2017 11:32
-
-
Save kapilreddy/a25c3efe37c04283e22b to your computer and use it in GitHub Desktop.
Kafka message offset finder
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; project.clj | |
;; [clj-time "0.6.0"] | |
;; [org.clojure/data.json "0.2.4"] | |
;; [clj-kafka "0.2.8-0.8.1.1"] | |
;; Utility to find offsets in a given Kafka topic for a given | |
;; cursor/point in time. The code assumes that each message has a | |
;; monotonically increasing number (ex. unix timestamp) associated with | |
;; it. | |
(ns find-offset | |
(:require [clj-kafka.consumer.simple :refer [consumer | |
messages | |
topic-offset | |
topic-meta-data]] | |
[clj-time.core :as ct] | |
[clojure.data.json :refer [read-str]]) | |
(:import [org.joda.time DateTime])) | |
(defn find-msg-at-offset | |
[compare-pred topic partition-id {:keys [host port]} & {:keys [max-jumps fetch-size] | |
:or {max-jumps 10 | |
fetch-size 10000}}] | |
(let [c (consumer host port "offset-finder")] | |
(let [earliest-offset (topic-offset c topic partition-id :earliest) | |
latest-offset (topic-offset c topic partition-id :latest) | |
offset-diff (int (/ (- latest-offset earliest-offset) | |
2)) | |
required-jumps (int (Math/ceil (/ (Math/log offset-diff) | |
(Math/log 2))))] | |
(if (<= required-jumps max-jumps) | |
(loop [offset earliest-offset | |
incr-by offset-diff] | |
(let [msg (last (messages c | |
"offset-finder" | |
topic | |
partition-id | |
offset | |
fetch-size))] | |
(if (zero? incr-by) | |
msg | |
(if msg | |
(if (compare-pred msg) | |
(recur (+ offset incr-by) | |
(int (/ incr-by 2))) | |
(recur (- offset incr-by) | |
(int (/ incr-by 2)))) | |
(recur (- offset incr-by) | |
(int (/ incr-by 2))))))) | |
(throw (Exception. (format "Finding offset will take more than %d. %d jumps needed to find the offset." | |
max-jumps | |
required-jumps))))))) | |
(defn find-offsets | |
[topic {:keys [host port]} pred & {:keys [max-jumps | |
fetch-size] | |
:or {max-jumps 10 | |
fetch-size 10000}}] | |
(let [c (consumer host port "offset-finder")] | |
(map (fn [{:keys [leader partition-id]}] | |
(let [{:keys [offset value]} (find-msg-at-offset pred | |
topic | |
partition-id | |
leader | |
:max-jumps max-jumps | |
:fetch-size fetch-size)] | |
{:topic topic | |
:partition-id partition-id | |
:offset offset})) | |
(:partition-metadata (first (topic-meta-data c [topic])))))) | |
(comment | |
(find-offsets "kafka-topic" | |
{:host "kafka-broker" | |
:port 9092} | |
(let [dt (ct/date-time 2015 4 10)] | |
(fn [msg] | |
;; Predicate that returns true if monotonically | |
;; increasing number in your message is less | |
;; than the point at which you want the offset | |
;; Returns false if the the number is past the point. | |
(let [{:keys [kafka_meta]} (-> msg | |
:value | |
String. | |
(read-str :key-fn keyword))] | |
(<= (:msg_ts kafka_meta) | |
(.getMillis dt))))) | |
:max-jumps 20 | |
:fetch-size 10000)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment