Skip to content

Instantly share code, notes, and snippets.

@onderkalaci
Last active January 2, 2018 19:19
Show Gist options
  • Save onderkalaci/2bfc18606604f93081d22fdf1a809aad to your computer and use it in GitHub Desktop.
Save onderkalaci/2bfc18606604f93081d22fdf1a809aad to your computer and use it in GitHub Desktop.
# entry point for shard rebalancer
def rebalance_table_shards(relation regclass,
threshold float4 default 0.1,
max_shard_moves int default 1000000,
excluded_shard_list bigint[] default '{}'):
# acquire lock on the relation to prevent concurrent rebalance operations
# acquire lock on the realtion id to prevent schema changes
for colocatedTables in colocatedTableList:
AcquireShareTableLock(collocatedPlacament.relationId)
# get the required shard placement list
shard_placement_list = ShardPlacementList (relation, excluded_shard_list)
# get the active worker node list
reponsive_worker_node_list = master_get_active_worker_nodes() # {filter by worker_node_responsive}
rebalance_placement_update_list = shard_placement_rebalance_array (worker_node_list, shard_placement_list,
threshold))[1:max_shard_moves];
for placement_update_data in rebalance_placement_update_list:
PERFORM update_shard_placement(placement_update_data, relation, reponsive_worker_node_list)
##
## Connect to itself and execute master_copy_shard_placement in a seperate connection.
## TODO: Do we really need relation parameter?
##
def update_shard_placement(placement_update_data, relation, reponsive_worker_node_list):
updateType, sshardId = placement_update_data.updateType, placement_update_data.shardId
sourceHost, sourcePort, do_drop = placement_update_data.sourceHost, placement_update_data.sourcePort
destHost, destPort = placement_update_data.destHost, placement_update_data.destPort
do_drop = placement_update_data.do_drop
# now, do the check here
if {source or destination} not in reponsive_worker_node_list:
ERROR: "source or destination is not responsive"
queryToExecute = "SELECT master_copy_shard_placement(%s, %s, %d, %s, %s, %d, %d)", shardId, sourceHost, sourcePort, destHost, destPort, doDrop
ExecuteRemoteCommand(LOCAL_HOST_NAME, PostPortNumber, queryToExecute)
##
## We already have master_copy_shard_placement(), which is to repair shards. So, we
## remove the pre-requisits the function has. Also, we make it so that it becomes aware
## of co-located placements.
##
## TODO: this implementation skips the tableId parameter since it is only used to get the
## shardTableName, which we can generate without table name as well
def master_copy_shard_placement(shardId, sourceHost, sourcePort, destHost, destPort, do_drop):
# get all collocatedPlacements
collocatedPlacamentList = CollocatedPlacementList(shardId, sourceHost, sourcePort)
# now that fetch the table without creating it, and then apply the constraints
for collocatedPlacament in collocatedPlacamentList:
# acquire lock on the realtion id to prevent schema changes
AcquireShareTableLock(collocatedPlacament.relationId)
AcquireShardMetadataLock(collocatedPlacament.shardId)
shardTableName = ShardTableName(collocatedPlacament.shardId);
appendStringInfo(dropTableQuery, DROP_TABLE_COMMAND, shardTableName)
# fetch table without CREATING THE TABLE
appendStringInfo(tableFetchQuery, TABLE_FETCH_COMMAND, shardTableName, shardLength,
sourceHost, sourcePort);
# go and drop the table if already exists, and fetch the fresh table
ExecuteCommandsOnTheHost({dropTableQuery,tableFetchQuery}, destHost, destPort)
# now update the metadata
InsertIntoShardPlacement(shardId, destHost, destPort)
# if drop specified, dropping the placement done in 2PC
if do_drop:
DeleteShardPlacement(shardId, destHost, destPort)
appendStringInfo(dropTableQuery, DROP_TABLE_COMMAND, shardTableName)
# we have to run this via 2PC so that placement is not dropped
# until the metadata changes committed
ExecuteCommandVia2PC(dropTableQuery, sourceHost, sourcePort)
###
### A psudo-code for shard_placement_rebalance_array
### Note that some parts (i.e., mapping function parameters to the variables, mapping return list to JSON etc.)
### But, still gives a good overview of what is going on while rebalancing.
###
def shard_placement_rebalance_array():
# get shard placement list for each of the nodes
# Format:
# [nodeIndex: 0] => {S1,S2, ..., SN}
# [nodeIndex: 1] => {S4,S5, ..., SX}
# [nodeIndex: 2] => {S2,S4, ..., SY}
nodeShardPlacementList[] = GenerateNodeShardList(tableName)
# first get the total placement count
totalPlacementCount = TotalPlacementCount(nodeShardPlacementList)
# get the average placement count
placementCountAverage = totalPlacementCount / workerNodeCount
# get the lower bound for any of the placement
placementCountLowerbound = (int) ((1.0 - threshold) * placementCountAverage);
# now itedate over the worker nodes to get each worker to have at least
# placementCountLowerbound placements
while (underUtilizedNodeExists)
# find the workers to operat on this iteration
workerThatHasMaxCountOfPlacement = findWorkerWithMaxPlacementCount(nodeShardPlacementList)
workerThatHasMinCountOfPlacement = findWorkerWithMinPlacementCount(nodeShardPlacementList)
# we already hit the expected distribution, do not proceed
if workerCount(workerThatHasMinCountOfPlacement) >= placementCountLowerbound
break
# now find the placement to move
# (i.e., the target does not already have the same placement)
for placement in workerThatHasMaxCountOfPlacement
# skip this placement if already exists in the min worker
if placement in workerThatHasMinCountOfPlacement:
continue
# now that we found the placement to move
placementToMove = placement
break
# create an event to update placements
placementUpdateEvent = PlacementUpdateEvent(placementToMove,
sourceNode: workerThatHasMinCountOfPlacement,
targetNode: workerThatHasMaxCountOfPlacement)
# add this placement update event to the return list
placementUpdateList.append(placementUpdateEvent)
# update the shard placement list according to the move
UpdateShardLists(nodeShardPlacementList)
return placementUpdateList
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment