Skip to content

Instantly share code, notes, and snippets.

@angeloskaltsikis
Forked from miguno/kafka-move-leadership.sh
Created September 27, 2021 13:32
Show Gist options
  • Save angeloskaltsikis/6daba0d4d13ee30182055396d4fae23e to your computer and use it in GitHub Desktop.
Save angeloskaltsikis/6daba0d4d13ee30182055396d4fae23e to your computer and use it in GitHub Desktop.
A simple Ops helper script for Apache Kafka to generate a partition reassignment JSON snippet for moving partition leadership away from a given Kafka broker. Use cases include 1) safely restarting a broker while minimizing risk of data loss, 2) replacing a broker, 3) preparing a broker for maintenance.
#!/usr/bin/env bash
#
# File: kafka-move-leadership.sh
#
# Description
# ===========
#
# Generates a Kafka partition reassignment JSON snippet to STDOUT to move the leadership
# of any replicas away from the provided "source" broker to different, randomly selected
# "target" brokers. Run this script with `-h` to show detailed usage instructions.
#
#
# Requirements
# ============
# - Kafka 0.8.1.1 and later.
#
#
# Usage
# =====
#
# To show usage instructions run this script with `-h` or `--help`.
#
#
# Full workflow
# =============
#
# High-level overview
# -------------------
#
# 1. Use this script to generate a partition reassignment JSON file.
# 2. Start the actual reassignment operation via Kafka's `kafka-reassign-partitions.sh` script and this JSON file.
# 3. Monitor the progress of the reassignment operation with Kafka's `kafka-reassign-partitions.sh` script.
#
# Example
# -------
#
# NOTE: If you have installed the Confluent package of Kafka, then the CLI tool
# `kafka-reassign-partitions.sh` is called `kafka-reassign-partitions`.
#
# Step 1 (generate reassignment JSON; this script):
#
# $ kafka-move-leadership.sh --broker-id 4 --first-broker-id 0 --last-broker-id 8 --zookeeper zookeeper1:2181 > partitions-to-move.json
#
# Step 2 (start reassignment process; Kafka built-in script):
#
# $ kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file partitions-to-move.json --execute
#
# Step 3 (monitor progress of reassignment process; Kafka built-in script):
#
# $ kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file partitions-to-move.json --verify
declare -r MYSELF=`basename $0`
print_usage() {
echo "$MYSELF - generates a Kafka partition reassignment JSON snippet to move partition leadership away from a broker (details below)"
echo
echo "Usage: $MYSELF [OPTION]..."
echo
echo " --broker-id Move leadership of all replicas, if any, from this broker"
echo " to different, randomly selected brokers. Example: 4"
echo " --first-broker-id First (= lowest) Kafka broker ID in the cluster. Used as"
echo " the start index for the range of broker IDs from which"
echo " replacement brokers will be randomly selected. Example: 0"
echo " --last-broker-id Last (= highest) Kafka broker ID in the cluster. Used as"
echo " the end index for the range of broker IDs from which"
echo " replacement brokers will be randomly selected. Example: 8"
echo " --zookeeper Comma-separated list of ZK servers with which the brokers"
echo " are registered. Example: zookeeper1:2181,zookeeper2:2181"
echo " -h, --help Print this help message and exit."
echo
echo "Example"
echo "-------"
echo
echo "The following example prints a partition reassignment JSON snippet to STDOUT that moves leadership"
echo "from the broker with ID 4 to brokers randomly selected from the ID range 0,1,2,3,4,5,6,7,8"
echo "(though 4 itself will be excluded from the range automatically):"
echo
echo " $ $MYSELF --broker-id 4 --first-broker-id 0 --last-broker-id 8 --zookeeper zookeeper1:2181"
echo
echo "Use cases include:"
echo "------------------"
echo " 1. Safely restarting a broker while minimizing risk of data loss."
echo " 2. Replacing a broker."
echo " 3. Preparing a broker for maintenance."
echo
echo "Detailed description"
echo "--------------------"
echo "Generates a Kafka partition reassignment JSON snippet to STDOUT"
echo "to move the leadership of any replicas from the provided broker ID to"
echo "different, randomly selected broker IDs."
echo
echo "This JSON snippet can be saved to a file and then be used as an argument for:"
echo
echo " $ kafka-reassign-partitions.sh --reassignment-json-file my.json"
echo
echo "Further information"
echo "-------------------"
echo "- http://kafka.apache.org/documentation.html#basic_ops_cluster_expansion"
echo "- https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool"
}
if [[ $# -eq 0 ]]; then
print_usage
exit 97
fi
while [[ $# -gt 0 ]]; do
case "$1" in
--broker-id)
shift
declare -r BROKER="$1"
shift
;;
--zookeeper)
shift
declare -r ZOOKEEPER_CONNECT="$1"
shift
;;
--first-broker-id)
shift
declare -r KAFKA_FIRST_BROKER_ID="$1"
shift
;;
--last-broker-id)
shift
declare -r KAFKA_LAST_BROKER_ID="$1"
shift
;;
-h|--help)
print_usage
exit 98
;;
*)
echo "ERROR: Unexpected option ${1}"
echo
print_usage
exit 99
;;
esac
done
# Input validation
if [ -z "$BROKER" ]; then
echo "ERROR: You must set the parameter --broker-id"
exit 80
fi
if [ -z "$ZOOKEEPER_CONNECT" ]; then
echo "ERROR: You must set the parameter --zookeeper"
exit 81
fi
if [ -z "$KAFKA_FIRST_BROKER_ID" ]; then
echo "ERROR: You must set the parameter --first-broker-id"
exit 82
fi
if [ -z "$KAFKA_LAST_BROKER_ID" ]; then
echo "ERROR: You must set the parameter --last-broker-id"
exit 83
fi
###############################################################################
### DEPENDENCIES
###############################################################################
declare -r KAFKA_TOPICS_SCRIPT_NAME_APACHE="kafka-topics.sh"
declare -r KAFKA_TOPICS_SCRIPT_NAME_CONFLUENT="kafka-topics"
declare -r FALLBACK_PATH="/opt/kafka/bin"
which "$KAFKA_TOPICS_SCRIPT_NAME_CONFLUENT" &>/dev/null
if [ $? -ne 0 ]; then
which "$KAFKA_TOPICS_SCRIPT_NAME_APACHE" &>/dev/null
if [ $? -ne 0 ]; then
declare -r FALLBACK_BIN="$FALLBACK_PATH/$KAFKA_TOPICS_SCRIPT_NAME_APACHE"
which "$FALLBACK_BIN" &>/dev/null
if [ $? -ne 0 ]; then
echo "ERROR: kafka-topics CLI tool (ships with Kafka) not found in PATH."
exit 70
else
declare -r KAFKA_TOPICS_BIN="$FALLBACK_BIN"
fi
else
declare -r KAFKA_TOPICS_BIN="$KAFKA_TOPICS_SCRIPT_NAME_APACHE"
fi
else
declare -r KAFKA_TOPICS_BIN="$KAFKA_TOPICS_SCRIPT_NAME_CONFLUENT"
fi
###############################################################################
### MISC CONFIGURATION - DO NOT TOUCH UNLESS YOU KNOW WHAT YOU ARE DOING
###############################################################################
declare -r OLD_IFS="$IFS"
###############################################################################
### UTILITY FUNCTIONS
###############################################################################
# Checks whether an array (first param) contains an element (second param).
# Returns 0 if the array contains the element, and 1 if it does not.
#
# Usage: array_contains myArray myElement
function array_contains {
local array="$1[@]"
local seeking=$2
local in=1
for element in "${!array}"; do
if [[ $element == $seeking ]]; then
in=0
break
fi
done
return $in
}
# Randomly selects a broker ID in the range specified by
# KAFKA_FIRST_BROKER_ID (including) and KAFKA_LAST_BROKER_ID (including).
#
# Usage: random_broker => may return e.g. "6"
function random_broker {
shuf -i ${KAFKA_FIRST_BROKER_ID}-${KAFKA_LAST_BROKER_ID} -n 1
}
# Randomly selects, from the list of available brokers (range specified by
# KAFKA_FIRST_BROKER_ID and KAFKA_LAST_BROKER_ID), a broker ID that is not
# already listed in the provided brokers (first param).
#
# Usage: other_broker "1,4,6" => may return e.g. "2"
#
# Note: Do NOT put spaces in the string. "1,2" is ok, "1, 2" is not.
function other_broker {
local brokers_string=$1
local all_brokers_string=`seq -s "," ${KAFKA_FIRST_BROKER_ID} ${KAFKA_LAST_BROKER_ID}`
if [ ${#brokers_string} -ge ${#all_brokers_string} ]; then
local no_other_broker_available=""
echo $no_other_broker_available
else
IFS=$',' read -a brokers <<< "$brokers_string"
local new_broker=`random_broker`
while array_contains brokers $new_broker; do
new_broker=`random_broker`
done
echo $new_broker
fi
}
# Returns a list of broker IDs by removing the provided broker ID (second param)
# from the provided list of original broker IDs (first param). If the original
# broker list does not contain the provided broker, the list is returned as is.
#
# The list of broker IDs must be a comma-separated list of numbers, e.g. "1,2".
#
# Usage: all_but_broker "1,2,3" "3" => returns "1,2"
#
# Note: Do NOT put spaces in the string. "1,2" is ok, "1, 2" is not.
function all_but_broker {
local brokers_string=$1
local broker=$2
IFS=$',' read -a brokers <<< "$brokers_string"
local new_brokers=""
for curr_broker in "${brokers[@]}"; do
if [ "$curr_broker" != "$broker" ]; then
new_brokers="$new_brokers,$curr_broker"
fi
done
# Remove leading comma, if any.
new_brokers=${new_brokers#","}
echo $new_brokers
}
# Returns a list of broker IDs based on a provided list of broker IDs (first
# param), where the provided broker ID (second param) is replaced by a
# randomly selected broker ID that is not already in the original list.
#
# Usage: replace_broker "1,2,3" "2" => may return e.g. "1,3,4"
#
# Note: Do NOT put spaces in the string. "1,2" is ok, "1, 2" is not.
function replace_broker {
local brokers_string=$1
local broker=$2
local remaining_brokers=`all_but_broker $brokers_string $broker`
local replacement_broker=`other_broker $brokers_string`
new_brokers="$remaining_brokers,$replacement_broker"
# Remove leading comma, if any.
new_brokers=${new_brokers#","}
# Remove trailing comma, if any.
new_brokers=${new_brokers%","}
echo $new_brokers
}
###############################################################################
### MAIN
###############################################################################
# "Header" of JSON file for Kafka partition reassignment
json="{\n"
json="$json \"partitions\": [\n"
# Actual partition reassignments
for topicPartitionReplicas in `$KAFKA_TOPICS_BIN --zookeeper $ZOOKEEPER_CONNECT --describe | grep "Leader: $BROKER" | awk '{ print $2"#"$4"#"$8 }'`; do
# Note: We use '#' as field separator in awk (see above) and here
# because it is not a valid character for a Kafka topic name.
IFS=$'#' read -a array <<< "$topicPartitionReplicas"
topic="${array[0]}" # e.g. "zerg.hydra"
partition="${array[1]}" # e.g. "4"
replicas="${array[2]}" # e.g. "0,8" (= comma-separated list of broker IDs)
new_replicas=`replace_broker $replicas $BROKER`
if [ -z "$new_replicas" ]; then
echo "ERROR: Cannot find any replacement broker. Maybe you have only a single broker in your cluster?"
exit 60
fi
json="$json {\"topic\": \"${topic}\", \"partition\": ${partition}, \"replicas\": [${new_replicas}] },\n"
done
# Remove tailing comma, if any.
json=${json%",\n"}
json="${json}\n"
# "Footer" of JSON file
json="$json ],\n"
json="$json \"version\": 1\n"
json="${json}}\n"
# Print JSON to STDOUT
echo -e $json
###############################################################################
### CLEANUP
###############################################################################
IFS="$OLD_IFS"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment