Last active
January 2, 2018 19:19
-
-
Save onderkalaci/2bfc18606604f93081d22fdf1a809aad to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# entry point for shard rebalancer | |
def rebalance_table_shards(relation regclass, | |
threshold float4 default 0.1, | |
max_shard_moves int default 1000000, | |
excluded_shard_list bigint[] default '{}'): | |
# acquire lock on the relation to prevent concurrent rebalance operations | |
# acquire lock on the realtion id to prevent schema changes | |
for colocatedTables in colocatedTableList: | |
AcquireShareTableLock(collocatedPlacament.relationId) | |
# get the required shard placement list | |
shard_placement_list = ShardPlacementList (relation, excluded_shard_list) | |
# get the active worker node list | |
reponsive_worker_node_list = master_get_active_worker_nodes() # {filter by worker_node_responsive} | |
rebalance_placement_update_list = shard_placement_rebalance_array (worker_node_list, shard_placement_list, | |
threshold))[1:max_shard_moves]; | |
for placement_update_data in rebalance_placement_update_list: | |
PERFORM update_shard_placement(placement_update_data, relation, reponsive_worker_node_list) | |
## | |
## Connect to itself and execute master_copy_shard_placement in a seperate connection. | |
## TODO: Do we really need relation parameter? | |
## | |
def update_shard_placement(placement_update_data, relation, reponsive_worker_node_list): | |
updateType, sshardId = placement_update_data.updateType, placement_update_data.shardId | |
sourceHost, sourcePort, do_drop = placement_update_data.sourceHost, placement_update_data.sourcePort | |
destHost, destPort = placement_update_data.destHost, placement_update_data.destPort | |
do_drop = placement_update_data.do_drop | |
# now, do the check here | |
if {source or destination} not in reponsive_worker_node_list: | |
ERROR: "source or destination is not responsive" | |
queryToExecute = "SELECT master_copy_shard_placement(%s, %s, %d, %s, %s, %d, %d)", shardId, sourceHost, sourcePort, destHost, destPort, doDrop | |
ExecuteRemoteCommand(LOCAL_HOST_NAME, PostPortNumber, queryToExecute) | |
## | |
## We already have master_copy_shard_placement(), which is to repair shards. So, we | |
## remove the pre-requisits the function has. Also, we make it so that it becomes aware | |
## of co-located placements. | |
## | |
## TODO: this implementation skips the tableId parameter since it is only used to get the | |
## shardTableName, which we can generate without table name as well | |
def master_copy_shard_placement(shardId, sourceHost, sourcePort, destHost, destPort, do_drop): | |
# get all collocatedPlacements | |
collocatedPlacamentList = CollocatedPlacementList(shardId, sourceHost, sourcePort) | |
# now that fetch the table without creating it, and then apply the constraints | |
for collocatedPlacament in collocatedPlacamentList: | |
# acquire lock on the realtion id to prevent schema changes | |
AcquireShareTableLock(collocatedPlacament.relationId) | |
AcquireShardMetadataLock(collocatedPlacament.shardId) | |
shardTableName = ShardTableName(collocatedPlacament.shardId); | |
appendStringInfo(dropTableQuery, DROP_TABLE_COMMAND, shardTableName) | |
# fetch table without CREATING THE TABLE | |
appendStringInfo(tableFetchQuery, TABLE_FETCH_COMMAND, shardTableName, shardLength, | |
sourceHost, sourcePort); | |
# go and drop the table if already exists, and fetch the fresh table | |
ExecuteCommandsOnTheHost({dropTableQuery,tableFetchQuery}, destHost, destPort) | |
# now update the metadata | |
InsertIntoShardPlacement(shardId, destHost, destPort) | |
# if drop specified, dropping the placement done in 2PC | |
if do_drop: | |
DeleteShardPlacement(shardId, destHost, destPort) | |
appendStringInfo(dropTableQuery, DROP_TABLE_COMMAND, shardTableName) | |
# we have to run this via 2PC so that placement is not dropped | |
# until the metadata changes committed | |
ExecuteCommandVia2PC(dropTableQuery, sourceHost, sourcePort) | |
### | |
### A psudo-code for shard_placement_rebalance_array | |
### Note that some parts (i.e., mapping function parameters to the variables, mapping return list to JSON etc.) | |
### But, still gives a good overview of what is going on while rebalancing. | |
### | |
def shard_placement_rebalance_array(): | |
# get shard placement list for each of the nodes | |
# Format: | |
# [nodeIndex: 0] => {S1,S2, ..., SN} | |
# [nodeIndex: 1] => {S4,S5, ..., SX} | |
# [nodeIndex: 2] => {S2,S4, ..., SY} | |
nodeShardPlacementList[] = GenerateNodeShardList(tableName) | |
# first get the total placement count | |
totalPlacementCount = TotalPlacementCount(nodeShardPlacementList) | |
# get the average placement count | |
placementCountAverage = totalPlacementCount / workerNodeCount | |
# get the lower bound for any of the placement | |
placementCountLowerbound = (int) ((1.0 - threshold) * placementCountAverage); | |
# now itedate over the worker nodes to get each worker to have at least | |
# placementCountLowerbound placements | |
while (underUtilizedNodeExists) | |
# find the workers to operat on this iteration | |
workerThatHasMaxCountOfPlacement = findWorkerWithMaxPlacementCount(nodeShardPlacementList) | |
workerThatHasMinCountOfPlacement = findWorkerWithMinPlacementCount(nodeShardPlacementList) | |
# we already hit the expected distribution, do not proceed | |
if workerCount(workerThatHasMinCountOfPlacement) >= placementCountLowerbound | |
break | |
# now find the placement to move | |
# (i.e., the target does not already have the same placement) | |
for placement in workerThatHasMaxCountOfPlacement | |
# skip this placement if already exists in the min worker | |
if placement in workerThatHasMinCountOfPlacement: | |
continue | |
# now that we found the placement to move | |
placementToMove = placement | |
break | |
# create an event to update placements | |
placementUpdateEvent = PlacementUpdateEvent(placementToMove, | |
sourceNode: workerThatHasMinCountOfPlacement, | |
targetNode: workerThatHasMaxCountOfPlacement) | |
# add this placement update event to the return list | |
placementUpdateList.append(placementUpdateEvent) | |
# update the shard placement list according to the move | |
UpdateShardLists(nodeShardPlacementList) | |
return placementUpdateList | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment