Created
April 11, 2018 18:41
-
-
Save wdberkeley/19e9b1a3a9729f6d099ce5e83a65fe3b to your computer and use it in GitHub Desktop.
Rudimentary balancing sim
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import random | |
# Number of pool slots per TS. | |
POOL_SLOTS_PER_TS=10 | |
# Maximum number of time steps it takes a move to complete. | |
MAX_MOVE_STEPS = 5 | |
# Event types. | |
# Move succeeded. The payload will be (table name, (src TS index, dest TS index)). | |
MOVE_SUCCEED = "ms" | |
# Generate a cluster of n tablet servers and t tables. | |
# Each tablet server will have will have < r replicas | |
# for each table, chosen uniformly at random from [0, r). | |
# A cluster is represented as a list of tables. | |
# a table is represented as a list of replica counts, one per ts. | |
# For example, a possible output of gen_uniform_cluster(5, 3, 10) | |
# is | |
# [[4, 8, 0, 6, 4], | |
# [5, 9, 1, 3, 3], | |
# [3, 7, 2, 0, 8]]/ | |
def gen_uniform_cluster(n, t, r): | |
return [[random.randrange(r) for _ in range(n)] for i in range(t)] | |
def total_replicas_in_cluster(cluster): | |
return reduce(lambda x, y: x + y, map(sum, cluster)) | |
# Return the table skew of 'table'. | |
# Table skew is (# replicas on TS with most replica) - (# replicas on TS with least replica). | |
# This modifies table so it is sorted by # of replicas, increasing. | |
def table_skew(table): | |
table.sort() | |
return table[-1] - table[0] | |
# Return the next move that should be done to balance table, encoded as (i, j) | |
# where i is the index of the TS to move from and j is the index of the TS to | |
# move to. | |
# This function assumes the table isn't balanced, else it may return a useless or pernicious move. | |
# No randomization is added. | |
def pick_move(table): | |
return (len(table) - 1, 0) | |
# Apply the move 'move' to the table 'table'. | |
def apply_move(table, move): | |
src, dest = move | |
table[src] -= 1 | |
table[dest] += 1 | |
# Apply the event to the cluster state. | |
# Nothing to do here right now since moves always succeed, so we apply them to the cluster state | |
# when they are proposed. This makes sense since we would consider them applied while they are in flight. | |
def apply_event(cluster, event): | |
pass | |
def parse_args(): | |
parser = argparse.ArgumentParser(description="A simulation of Kudu rebalancing") | |
parser.add_argument("--ts", type=int, default=10, help="the number of tablet servers") | |
parser.add_argument("--tables", type=int, default=5, help="the number of tables") | |
parser.add_argument("--replicas_per_ts_per_table", type=int, default=100, | |
help="maximum number of replicas per table and tablet server") | |
args = parser.parse_args() | |
return args.ts, args.tables, args.replicas_per_ts_per_table | |
def main(): | |
# Set initial state of cluster | |
n, t, r = parse_args() | |
cluster = gen_uniform_cluster(n, t, r) | |
print "Initial cluster state =", cluster | |
# Set up pool of fixed capacity | |
max_pool_size = POOL_SLOTS_PER_TS * n | |
pool = [] | |
# Set up event queue. | |
# An event will be modeled as a pair (time, type, data needed to apply event to the cluster). | |
events = [] | |
# The total number of replicas shouldn't change. Compute it pre-rebalancing | |
# so we can check that invariant. | |
# TODO: Stop sorting the cluster list so we can track table identity and maintain the invariant per table. | |
total_replicas = total_replicas_in_cluster(cluster) | |
# Advance time in discrete steps. | |
t = -1 | |
while True: | |
# Invariants for each time step. | |
# Total number of replicas is constant. | |
# TODO: Total number of replicas per table is constant. | |
assert(total_replicas == total_replicas_in_cluster(cluster)) | |
# Every event is an ongoing move: true for now. | |
assert(len(pool) == len(events)) | |
# Advance time at the start so we can use continue statements. | |
t += 1 | |
#print ("Cluster state at t = %d:" % t), cluster | |
# Apply events. | |
# This is inefficient. Would be better to keep events sorted by time. | |
# Also I'm cheating and I know I can pop a move off of the front of pool every time an event is triggered. | |
for i, e in enumerate(events): | |
if e[0] == t: | |
apply_event(cluster, e) | |
events.pop(i) | |
pool.pop(0) | |
# Don't act if the pool is full. | |
if len(pool) >= max_pool_size: | |
assert(len(pool) == max_pool_size) | |
#print "pool full: no action for step t =", t | |
continue | |
# Each time step we exhaust our pool capacity. | |
while len(pool) < max_pool_size: | |
# Sort tables by skew, from highest to lowest skew. | |
tables_by_skew = sorted([table for table in cluster], key=table_skew, reverse=True) | |
# If the most skewed table is balanced, we're done. | |
# Well, just because moves always succeed in this toy first version. | |
# Really we should wait until the pool clears. | |
if table_skew(tables_by_skew[0]) <= 1: | |
print 'BALANCED at t =', t | |
print "Cluster =", cluster | |
exit(0) | |
# Iterate by skew. | |
for table in tables_by_skew: | |
if len(pool) >= max_pool_size: | |
assert(len(pool) == max_pool_size) | |
#print "pool filled: not finding more moves for t =", t | |
break | |
# Pick the move. | |
move = pick_move(table) | |
# Cheat a bit and just apply the move. | |
# This avoids queueing it to succeed at some point and then having to apply it each | |
# time step, which involves making copies of the cluster state to which the move can be applied. | |
# Errors could be modeled as "undos" of moves in this paradigm. | |
pool.append(move) | |
apply_move(table, move) | |
# Add a no-op to the event queue for when the move succeeds. | |
events.append((t + random.randrange(1, MAX_MOVE_STEPS), MOVE_SUCCEED, (table, move))) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment