Created
July 3, 2019 13:22
-
-
Save padilo/6a5101f9e005c975956208cb19ba9f67 to your computer and use it in GitHub Desktop.
Script to get from Kakfa the list of topics consumed by each consumer group
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
if [ $# -ne 2 ]; then | |
>&2 echo "usage: $0 <kafka_binary_folder> <bootstrap.server>" | |
>&2 echo | |
>&2 echo "It outputs the list of topics consumed by consumergroup as csv." | |
>&2 echo "With the following structure:" | |
>&2 echo " <consumer>,<topic>" | |
exit 1 | |
fi | |
kafka_path=$1/bin | |
bootstrap=$2 | |
tmp_dir=$(mktemp -d "${TMPDIR:-/tmp/}$(basename $0).pid.XXXXXXXXXXXX") | |
consumer_group=consumer-reader-$RANDOM | |
pid_file=$tmp_dir/pid | |
function launch_consumer { | |
(${kafka_path}/kafka-console-consumer.sh --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server ${bootstrap} --topic __consumer_offsets --from-beginning --consumer-property group.id=${consumer_group} & echo $! >&3 ) 3>${pid_file} | cut -f1 -d":" | tr -d "[]" | cut -f1,2 -d"," | sort -u & | |
} | |
function stop_consumer { | |
kill $(<$pid_file) | |
>&2 echo "Finishing..." | |
sleep 10 | |
} | |
function get_lag { | |
lag=$( ${kafka_path}/kafka-consumer-groups.sh --bootstrap-server ${bootstrap} --offsets --describe --group ${consumer_group} | tr -s ' ' ',' | cut -f5 -d, | tail -n +3 | awk '{s+=$1} END {print s}') | |
} | |
function print_progress { | |
>&2 echo $(echo "100 - 100 * $lag / $first_lag" | bc)% | |
} | |
launch_consumer | |
>&2 echo "Waiting a bit until consumer starts..." | |
sleep 20 | |
>&2 echo "Reading..." | |
get_lag | |
first_lag=$lag | |
while [ $lag -gt "1000" ]; do | |
sleep 5 | |
get_lag | |
print_progress | |
done | |
>&2 echo "Waiting extra time to consume last offsets..." | |
sleep 10 | |
stop_consumer | |
rm -rf $tmp_dir | |
>&2 echo "Done" |
Thank You. This worked well. Helped a lot. :-)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This script has to read the whole __consumer_offsets topic, so it may take long.
It will output the result to stdout.
Example:
kafka-consumer-by-topic.sh ~/apps/kafka_2.12-2.0.1 localhost:9092 > output.csv