Forked from ducas/increase-replication-factor.js
Last active
December 18, 2019 15:58
-
-
Save proxium/e809e6141b2e061ae2d213a409a02e47 to your computer and use it in GitHub Desktop.
Increase Replication Factor for Kafka Topics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* This script reads the existing partition assignments in JSON (exported using kafka-reassign-partitions.sh) and generates | |
* the appropriate reassignment JSON file for input to kafka-reassign-partitions.sh. | |
* | |
*/ | |
var usage = ` | |
Usage: node increase-replication-factory.js (brokers) (replication factor) (current assignment file) (output file) [-f] | |
* brokers: comma separated list of brokers | |
* replication factor: the new replication factor to use | |
* current assignment file: the JSON file to read for current replica partition assignments (exported from kafka-reassign-partitions.sh) | |
* output file: the file to output JSON to (to be used by kafka-reassign-partitions.sh) | |
* -f: overwrite existing output file | |
` | |
var fs = require('fs'); | |
var args = process.argv; | |
if (args.length < 6) { | |
console.error(usage); | |
process.exit(1); | |
} | |
var brokers = args[2].split(',').map(n => parseInt(n)); | |
var replicationFactor = parseInt(args[3]); | |
var reassignments = JSON.parse(fs.readFileSync(args[4])); | |
if (fs.existsSync(args[5]) && args.indexOf('-f') == -1) { | |
console.error('Output file exists. Use "-f" to overwrite.'); | |
process.exit(1); | |
} | |
for (var i = 0; i < reassignments.partitions.length; i++) { | |
delete reassignments.partitions[i].log_dirs; | |
var partition = reassignments.partitions[i]; | |
var replicas = partition.replicas; | |
while (replicas.length < replicationFactor) { | |
var available = brokers.filter(b => replicas.indexOf(b) == -1); | |
var nextBroker = available[Math.floor((Math.random() * available.length))]; | |
replicas.push(nextBroker); | |
} | |
if (partition.replicas.length != replicationFactor) { | |
throw 'length != replicationFactor - ' + partition.replicas.length + ' != ' + replicationFactor; | |
} | |
} | |
fs.writeFileSync(args[5], JSON.stringify(reassignments, null, 2)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Edit: remove log_dirs element from the json because it's an optional parameter