Skip to content

Instantly share code, notes, and snippets.

@bepcyc
Last active December 11, 2020 14:50
Show Gist options
  • Save bepcyc/fc677e27dbbe40c7c234ffa821fb26b7 to your computer and use it in GitHub Desktop.
Save bepcyc/fc677e27dbbe40c7c234ffa821fb26b7 to your computer and use it in GitHub Desktop.
generate kafka topic offsets JSON for a given timestamp
#!/usr/bin/env bash
# This script generates a JSON structure that represents
# topic offsets per partition for a given moment of time
# this structure is very useful when used as a
# "startingOffsets" parameter in Spark Structured Streaming
shopt -s expand_aliases
#### YOU NEED TO SET UP THIS PART ####
# kafkacat and jq should be installed
alias mykafkacat=kafkacat #... setup your brokers/security here first
TOPIC_NAME=${TOPIC_NAME:-mytopic}
# date should be in ms, replace with your example
TOPIC_TS=${TOPIC_TS:-$(date -d "23 hours 55 minutes ago" +%s%3N)}
# can be done without it, but I'm lazy
TOPIC_FILE=${TOPIC_FILE:-"/tmp/${TOPIC_NAME}_offsets.json"}
#######################################
echo '{"'${TOPIC_NAME}'":{' >$TOPIC_FILE
# -J is for JSON output
# go over partitions of a topic and print offsets corresponding to selected timestamp
# join results with comma
mykafkacat -L -J -t $TOPIC_NAME | jq -c ".topics[0].partitions[] | .partition" | while read -r p; do mykafkacat -Q -J -t $TOPIC_NAME:$p:$TOPIC_TS | jq -c --arg key $p '."'$TOPIC_NAME'"."'$p'" | {($key): .offset}'| tr -d '{}'; done | paste -sd "," - >>$TOPIC_FILE
# for p in {0..2}; do mykafkacat -Q -J -t $TOPIC_NAME:$p:$TOPIC_TS | jq -c --arg key $p '."'$TOPIC_NAME'"."'$p'" | {($key): .offset}'; done | paste -sd "," -
echo '}}' >> $TOPIC_FILE
cat $TOPIC_FILE | jq '.' # or use "jq -c '.'" to minimize output
rm $TOPIC_FILE
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment