-
-
Save anandnalya/a69fc036fff249a61418a8de6b1a0b46 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import json | |
import random | |
import sys | |
def get_topic( args, data ): | |
topic = data['partitions'][0]['topic'] | |
for partition in data['partitions']: | |
if topic != partition['topic']: | |
raise ValueError( 'Rebalance only works on one topic at a time' ) | |
return topic | |
def get_brokers( args, data ): | |
brokers = set() | |
if args.brokers: | |
for broker in args.brokers.split( ',' ): | |
brokers.add( int( broker ) ) | |
else: | |
for partition in data['partitions']: | |
for broker in partition['replicas']: | |
brokers.add( broker ) | |
return brokers | |
def get_target_replicas( args, data ): | |
if args.replicas: | |
replicas = args.replicas | |
else: | |
replicas = get_current_replicas( args, data ) | |
return replicas | |
def get_current_replicas( args, data ): | |
num_replicas = set() | |
for partition in data['partitions']: | |
num_replicas.add( len( partition['replicas'] ) ) | |
return max( num_replicas ) | |
def get_broker_list( data, replicas, brokers ): | |
broker_list = [] | |
per_broker = len( data['partitions'] ) * replicas / len( brokers ) | |
for broker in brokers: | |
broker_list += [ broker ] * per_broker | |
return broker_list | |
def rebalance( data, replicas, broker_list, verbose ): | |
random.shuffle( broker_list ) | |
new_partitions = [] | |
for partition in data['partitions']: | |
i = 0 | |
new_partition = partition.copy() | |
new_partition['replicas'] = [] | |
while True: | |
# Ran out of brokers that can be assigned | |
if len( broker_list ) == i: | |
return None | |
# Don't use if matches any existing broker | |
if broker_list[i] in partition['replicas']: | |
i += 1 | |
continue | |
# Don't use if matches already selected new broker | |
if broker_list[i] in new_partition['replicas']: | |
i += 1 | |
continue | |
new_partition['replicas'] += [ broker_list[i] ] | |
broker_list = broker_list[0:i] + broker_list[1+i:] | |
i=0 | |
# Found all brokers for this partition | |
if len( new_partition['replicas'] ) == replicas: | |
break | |
if verbose: | |
print "Partition " + str( partition['partition'] ) + "\t" + str( partition['replicas'] ) + " -> " + str( new_partition['replicas'] ) | |
new_partitions += [ new_partition ] | |
return new_partitions | |
# CLI args | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"assignment", type=str, | |
help="current partition replica assignment (JSON)" | |
) | |
parser.add_argument( | |
"--brokers", type=str, | |
help="override brokers to use" | |
) | |
parser.add_argument( | |
"--replicas", type=int, | |
help="override number of replicas after rebalance" | |
) | |
parser.add_argument( | |
"-v", "--verbose", | |
help="increase output verbosity", action="store_true" | |
) | |
args = parser.parse_args() | |
data = json.loads( args.assignment ) | |
# Check we have enough brokers to handle rebalance | |
if get_target_replicas( args, data ) + get_current_replicas( args, data ) > len( get_brokers( args, data ) ): | |
raise ValueError( 'Cannot rebalance, not enough brokers' ) | |
topic = get_topic( args, data ) | |
replicas = get_target_replicas( args, data ) | |
brokers = get_brokers( args, data ) | |
broker_list = get_broker_list( data, replicas, brokers ) | |
# Try to assign 100 times then giveup | |
print "Attempting to rebalance " + topic | |
for i in range( 0, 1800 ): | |
if args.verbose: | |
print "Try " + str( i ) | |
else: | |
sys.stdout.write('.') | |
if 74 == i%75: | |
print "" | |
new_partitions = rebalance( data, replicas, broker_list, args.verbose ) | |
if None != new_partitions: | |
break | |
if None == new_partitions: | |
raise RuntimeError( 'Could not rebalance partitions' ) | |
print "\nDone!\n" | |
print json.dumps( | |
{ "version" : 1, "partitions" : new_partitions }, | |
separators=( ',', ':' ) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment