Last active
June 15, 2020 16:17
-
-
Save OneCricketeer/a12b4d9b26f0f2df4bee10cdc9c16d5d to your computer and use it in GitHub Desktop.
Backup Confluent Schema Registry Topic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
###################################### | |
# Backup Schema Registry | |
# | |
# To Restore: | |
# 0. Download if remotely stored. | |
# 1. Extract : `tar -xjvf schemas.tar.bz2` | |
# | |
# 2. Inspect Logs & Errors : `cat schemas-err.txt` | |
# 3. Inspect Schemas : `tail -n50 schemas.txt` | |
# 4. Check how many schemas: `wc -l schemas.txt` | |
# | |
# 5. Load : `kafka-console-producer --broker-list $KAFKA --topic _schemas --property parse.key=true < schemas.txt` | |
####################################### | |
set -eu -o pipefail | |
if [ $# -lt 2 ]; then | |
echo "backup-schemas <bootstrap-servers> <timeout-ms>" | |
exit 1 | |
fi | |
# Confluent version compatibile with our brokers | |
CP_VERSION=3.3.2 | |
SCHEMAS_TOPIC=_schemas | |
STDOUT=schemas.txt | |
STDERR=schemas-err.txt | |
TIMEOUT_MS=$2 | |
NOW=$(date +"%Y%m%d_%H%M") | |
echo "==> [${NOW}] Consuming ${SCHEMAS_TOPIC} with 'timeout-ms'=${TIMEOUT_MS}" | |
# This is a really naïve way to do this... Realistically, it should have a consumer group that can | |
# track offsets and progress continuously rather than always re-read from the beginning, which will get longer | |
# over time. | |
# TODO: Find a way to know how if timeout-ms is too small (compute `wc -l` for each day) | |
docker run --rm -ti \ | |
-v $PWD:/workdir \ | |
confluentinc/cp-kafka:${CP_VERSION} \ | |
bash -c "kafka-console-consumer --from-beginning --property print.key=true \ | |
--bootstrap-server $1 --topic ${SCHEMAS_TOPIC} \ | |
--timeout-ms ${TIMEOUT_MS} \ | |
1>/workdir/${STDOUT} \ | |
2>/workdir/${STDERR}" | |
# For debugging | |
cat ${STDERR} # Should say "Consumed x records. But will also output the TimeoutException" | |
echo "==> ${STDOUT} contains $(wc -l ${STDOUT} | awk '{print $1}') messages" | |
echo "==> Compressing schemas" | |
# Errors file is not removed so we can download and inspect it later | |
tar -cjvf schema-registry-${NOW}.tar.bz2 ${STDOUT} ${STDERR} && rm ${STDOUT} # ${STDERR} | |
# TODO: Upload to S3, for example | |
# aws s3 cp schema-registry-${NOW}.tar.bz2 ${S3_BUCKET}/backup-schema-registry/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment