Last active
November 8, 2021 14:42
-
-
Save ducas/0fbdbbaa9936e490e3b19eb8d329c200 to your computer and use it in GitHub Desktop.
Increase Replication Factor for Kafka Topics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* This script reads the existing partition assignments in JSON (exported using kafka-reassign-partitions.sh) and generates | |
* the appropriate reassignment JSON file for input to kafka-reassign-partitions.sh. | |
* | |
*/ | |
var usage = ` | |
Usage: node increase-replication-factory.js (brokers) (replication factor) (current assignment file) (output file) [-f] | |
* brokers: comma separated list of brokers | |
* replication factor: the new replication factor to use | |
* current assignment file: the JSON file to read for current replica partition assignments (exported from kafka-reassign-partitions.sh) | |
* output file: the file to output JSON to (to be used by kafka-reassign-partitions.sh) | |
* -f: overwrite existing output file | |
` | |
var fs = require('fs'); | |
var args = process.argv; | |
if (args.length < 6) { | |
console.error(usage); | |
process.exit(1); | |
} | |
var brokers = args[2].split(',').map(n => parseInt(n)); | |
var replicationFactor = parseInt(args[3]); | |
var reassignments = JSON.parse(fs.readFileSync(args[4])); | |
if (fs.existsSync(args[5]) && args.indexOf('-f') == -1) { | |
console.error('Output file exists. Use "-f" to overwrite.'); | |
process.exit(1); | |
} | |
for (var i = 0; i < reassignments.partitions.length; i++) { | |
var partition = reassignments.partitions[i]; | |
var replicas = partition.replicas; | |
while (replicas.length < replicationFactor) { | |
var available = brokers.filter(b => replicas.indexOf(b) == -1); | |
var nextBroker = available[Math.floor((Math.random() * available.length))]; | |
replicas.push(nextBroker); | |
} | |
if (partition.replicas.length != replicationFactor) { | |
throw 'length != replicationFactor - ' + partition.replicas.length + ' != ' + replicationFactor; | |
} | |
} | |
fs.writeFileSync(args[5], JSON.stringify(reassignments, null, 2)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment