Last active
August 23, 2016 08:38
-
-
Save onderkalaci/25fc519a1d6501a65db1827dcbf76689 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Psude code for creating hash partitioned tables. | |
# Algorithm: | |
# When a table is first created with a given shard count and | |
# replication factor, generate a new collocationId. | |
# If there already exists a collocation configuration with the | |
# given shard count and replication factor, create placements | |
# that are collocated with an existing table. | |
# | |
def create_hash_partitioned_table(tableName, partitionColumn): | |
# check if there already exists a collocation for the given shard count | |
# and replication factor | |
colocationId = GetColocationId(shardCount, replicationFactor) | |
if collocationId != InvalidOid | |
# create the placements on the same worker nodes as their collocated tables | |
CreateWorkerShardsForGivenCollocationId('tableName'::regclass, collocationId) | |
else | |
CreateACollocationId(shardCount, replicationFactor) | |
# use the existing round-robin policy | |
CreateWorkerShards('tableName'::regclass, shardCount, replicationFactor); | |
CreateDistributedTable('tableName'::regclass, DISTRIBUTED_BY_HASH, partitionColumn, colocationId); | |
# | |
# This function creates collocated table for the given tableName | |
# with collocationId. | |
# | |
def CreateWorkerShardsForGivenCollocationId(tableName, collocationId): | |
CollocatedTable = getFirstCollocatedTable(collocationId) | |
CollocatedTableShardPlacements = CollocatedTable->WorkerShardPlacements | |
# sort by shardId, workerNode and worker port | |
CollocatedTableShardPlacements.SortList() | |
for collocatedPlacement in CollocatedTableShardPlacements: | |
hostName = collocatedPlacement->workerName | |
portnumber = collocatedPlacement->workerPort | |
shardMinValue = ShardMinValue(collocatedPlacement) | |
shardMaxValue = ShardMaxValue(collocatedPlacement) | |
shardId = GenerateNewShardId() | |
WorkerCreateShard(ddlEvents, hostName, portName, shardMinValue, shardMaxValue, shardId) | |
# | |
# Psude code for rebalancing colocated tables | |
# | |
# Input is a table name, all colocated tables has to | |
# be rebalanced together. | |
# | |
# Algorithm: Ensure that queries that relies on collocation information are | |
# blocked until rebalance for all collocated tables are done. | |
# | |
# With this, we can utilize all the infrastructure that shard rebalancer already | |
# provides. | |
# | |
def rebalance_colocated_tables_simpler_but_blocks_all_collocated_queries(inputTableList[]): | |
colocatedTableList = [] | |
collocationId = InvalidOid | |
rebalanceOperations = [] | |
# rebalance all collocated tables with the same collocationId | |
if list_length(inputTableList) == 1: | |
colocatedTableList = ColocatedTableList(colocationId) | |
# rebalance only given tables | |
else | |
colocatedTableList = inputTableList | |
# get the collocationId for the given table | |
colocationId = ColocationId(inputTableList[0]) | |
# acquire lock to prevent collocated queries being planned | |
AcquireAccessExclusiveLock(colocationId) | |
# get all the rebalance operations that are on the colocatedTableList | |
for colocatedTable in colocatedTableList: | |
rebalanceOperations [colocatedTable] = shard_placement_rebalance_array() | |
# TODO: Could it be an Assert instead? | |
# We should ensure that all (source -> target) must be the same for collocated tables | |
ErrorIfNotAllRebalanceOperationsAreSameForAllTables(rebalanceOperations) | |
# once we're OK with that, we should continue to rebalance operations | |
for rebalanceOperationsForSingleTable in rebalanceOperations: | |
# Acquire lock as we already do on rebalance_table_shards() | |
AcquireAccessExclusiveLock(tableOid) | |
for placement_update_data in rebalanceOperationsForSingleTable | |
update_shard_placement(placement_update_data) | |
# release the lock | |
ReleaseLock(tableOid) | |
# | |
# Psude code for rebalancing colocated tables | |
# | |
# Input is a table name, all colocated tables has to | |
# be rebalanced together. | |
# | |
# Algorithm: Ensure that placements that are collocated are moved once at a time | |
# with acquiring locks on all of them first. | |
# | |
# With this, we need to re-write some of the fundemantal functions that shard rebalancer already | |
# provided to us. | |
# | |
def rebalance_colocated_tables_more_complicated_but_does_not_block_collocated_queries(inputTableList[]): | |
colocatedTableList = [] | |
collocationId = InvalidOid | |
rebalanceOperations = [] | |
# rebalance all collocated tables with the same collocationId | |
if list_length(inputTableList) == 1: | |
colocatedTableList = ColocatedTableList(colocationId) | |
# rebalance only given tables | |
else | |
colocatedTableList = inputTableList | |
# get the collocationId for the given table | |
colocationId = ColocationId(inputTableList[0]) | |
# get all the rebalance operations that are on the colocatedTableList | |
for colocatedTable in colocatedTableList: | |
rebalanceOperations [colocatedTable] = shard_placement_rebalance_array() | |
# TODO: Could it be an Assert instead? | |
# We should ensure that all (source -> target) must be the same for collocated tables | |
ErrorIfNotAllRebalanceOperationsAreSameForAllTables(rebalanceOperations) | |
for collocatedPlacements in rebalanceOperations: | |
for collocatedPlacement in collocatedPlacements: | |
AcquireAccessExclusiveLock(collocatedPlacement.shardId) | |
move_shard_placements(placement_update_data) | |
for collocatedPlacement in collocatedPlacements: | |
ReleaseAccessExclusiveLock(collocatedPlacement.shardId) | |
# | |
# We probably want to keep rebalance_table_shards(). But, | |
# if user uses this, we need to remove the given table from | |
# collocation group. | |
# | |
def rebalance_table_shards(tableName): | |
# we already have this function in shard | |
# rebalancer repo | |
AcquireLocak('tableName'::regclass) | |
ResponsiveWorkerNodeList = GetResponsiveWorkerList() | |
RebalanceUpdateCommandList = shard_placement_rebalance_array(ResponsiveWorkerNodeList, shardPlacementList) | |
## Now that we should update collocationId of the table | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment