Last active
December 11, 2020 14:50
-
-
Save bepcyc/fc677e27dbbe40c7c234ffa821fb26b7 to your computer and use it in GitHub Desktop.
generate kafka topic offsets JSON for a given timestamp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
# This script generates a JSON structure that represents | |
# topic offsets per partition for a given moment of time | |
# this structure is very useful when used as a | |
# "startingOffsets" parameter in Spark Structured Streaming | |
shopt -s expand_aliases | |
#### YOU NEED TO SET UP THIS PART #### | |
# kafkacat and jq should be installed | |
alias mykafkacat=kafkacat #... setup your brokers/security here first | |
TOPIC_NAME=${TOPIC_NAME:-mytopic} | |
# date should be in ms, replace with your example | |
TOPIC_TS=${TOPIC_TS:-$(date -d "23 hours 55 minutes ago" +%s%3N)} | |
# can be done without it, but I'm lazy | |
TOPIC_FILE=${TOPIC_FILE:-"/tmp/${TOPIC_NAME}_offsets.json"} | |
####################################### | |
echo '{"'${TOPIC_NAME}'":{' >$TOPIC_FILE | |
# -J is for JSON output | |
# go over partitions of a topic and print offsets corresponding to selected timestamp | |
# join results with comma | |
mykafkacat -L -J -t $TOPIC_NAME | jq -c ".topics[0].partitions[] | .partition" | while read -r p; do mykafkacat -Q -J -t $TOPIC_NAME:$p:$TOPIC_TS | jq -c --arg key $p '."'$TOPIC_NAME'"."'$p'" | {($key): .offset}'| tr -d '{}'; done | paste -sd "," - >>$TOPIC_FILE | |
# for p in {0..2}; do mykafkacat -Q -J -t $TOPIC_NAME:$p:$TOPIC_TS | jq -c --arg key $p '."'$TOPIC_NAME'"."'$p'" | {($key): .offset}'; done | paste -sd "," - | |
echo '}}' >> $TOPIC_FILE | |
cat $TOPIC_FILE | jq '.' # or use "jq -c '.'" to minimize output | |
rm $TOPIC_FILE |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment