Created
May 5, 2011 00:35
-
-
Save rsumbaly/956316 to your computer and use it in GitHub Desktop.
Edge cases
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- Rebalance Task Id : 1 | |
- Current cluster : Cluster('test-cluster', [Node0 in zone 0 partitionList:[0, 1, 2, 3], Node1 in zone 0 partitionList:[4, 5, 6, 7], Node2 in zone 0 partitionList:[], ]) | |
- Target cluster : Cluster('test-cluster', [Node0 in zone 0 partitionList:[1, 2, 3], Node1 in zone 0 partitionList:[4, 5, 6, 7], Node2 in zone 0 partitionList:[0], ]) | |
- Partition distribution : | |
Current Cluster: | |
0 - [2, 1, 0, 3] - [5, 4, 7, 6] | |
1 - [7, 6, 5, 4] - [1, 0, 3, 2] | |
2 - empty | |
Target Cluster: | |
0 - [2, 1, 3] - [0] | |
1 - [7, 6, 5, 4] - [1, 3, 2] | |
2 - [0] - [5, 4, 7, 6] | |
- Ordered rebalance node plan : | |
RebalancePartitionsInfo(2 <--- 0, partitions moved - [0] - [5, 4, 7, 6], partitions deleted - [] - [5, 4, 7, 6], stores: [test]) | |
RebalancePartitionsInfo(0 <--- 1, partitions moved - [] - [0], partitions deleted - [] - [0], stores: [test]) | |
- Rebalance Task Id : 2 | |
- Current cluster : Cluster('test-cluster', [Node0 in zone 0 partitionList:[0, 1, 2, 3], Node1 in zone 0 partitionList:[4, 5, 6, 7], ]) | |
- Target cluster : Cluster('test-cluster', [Node0 in zone 0 partitionList:[1, 2, 3], Node1 in zone 0 partitionList:[4, 5, 6, 7, 0], ]) | |
- Partition distribution : | |
Current Cluster: | |
0 - [2, 1, 0, 3] - [5, 4, 7, 6] | |
1 - [7, 6, 5, 4] - [1, 0, 3, 2] | |
Target Cluster: | |
0 - [2, 1, 3] - [0, 5, 4, 7, 6] | |
1 - [7, 0, 6, 5, 4] - [1, 3, 2] | |
- Ordered rebalance node plan : | |
RebalancePartitionsInfo(1 <--- 0, partitions moved - [0], partitions deleted - [], stores: [test]) | |
RebalancePartitionsInfo(0 <--- 1, partitions moved - [] - [0], partitions deleted - [] - [], stores: [test]) | |
- Rebalance Task Id : 3 | |
- Current cluster : Cluster('test-cluster', [Node0 in zone 0 partitionList:[0, 4], Node1 in zone 0 partitionList:[1, 5], Node2 in zone 0 partitionList:[2, 6], Node3 in zone 0 partitionList:[3, 7], ]) | |
- Target cluster : Cluster('test-cluster', [Node0 in zone 0 partitionList:[4], Node1 in zone 0 partitionList:[0, 1, 5], Node2 in zone 0 partitionList:[2, 6], Node3 in zone 0 partitionList:[3, 7], ]) | |
- Partition distribution : | |
Current Cluster: | |
0 - [0, 4] - [3, 7] - [2, 6] | |
1 - [1, 5] - [0, 4] - [3, 7] | |
2 - [2, 6] - [1, 5] - [0, 4] | |
3 - [7, 3] - [2, 6] - [1, 5] | |
Target Cluster: | |
0 - [4] - [3] - [2] | |
1 - [1, 0, 5] - [4, 7] - [3, 6] | |
2 - [2, 6] - [1, 0, 5] - [4, 7] | |
3 - [7, 3] - [2, 6] - [0, 1, 5] | |
- Ordered rebalance node plan : | |
RebalancePartitionsInfo(1 <--- 0, partitions moved - [0] - [7] - [6], partitions deleted - [0] - [7] - [6], stores: [test]) | |
RebalancePartitionsInfo(2 <--- 1, partitions moved - [] - [0] - [7], partitions deleted - [] - [] - [], stores: [test]) | |
RebalancePartitionsInfo(3 <--- 2, partitions moved - [] - [] - [0], partitions deleted - [] - [] - [], stores: [test]) | |
- Rebalance Task Id : 4 | |
- Current cluster : Cluster('test-cluster', [Node0 in zone 0 partitionList:[4], Node1 in zone 0 partitionList:[0, 1, 5], Node2 in zone 0 partitionList:[2, 6], Node3 in zone 0 partitionList:[3, 7], ]) | |
- Target cluster : Cluster('test-cluster', [Node0 in zone 0 partitionList:[], Node1 in zone 0 partitionList:[0, 1, 5], Node2 in zone 0 partitionList:[2, 4, 6], Node3 in zone 0 partitionList:[3, 7], ]) | |
- Partition distribution : | |
Current Cluster: | |
0 - [4] - [3] - [2] | |
1 - [1, 0, 5] - [4, 7] - [3, 6] | |
2 - [2, 6] - [1, 0, 5] - [4, 7] | |
3 - [7, 3] - [2, 6] - [0, 1, 5] | |
Target Cluster: | |
0 - empty | |
1 - [1, 0, 5] - [4, 7] - [2, 3, 6] | |
2 - [2, 6, 4] - [1, 0, 3, 5] - [7] | |
3 - [7, 3] - [2, 6] - [0, 1, 4, 5] | |
- Ordered rebalance node plan : | |
RebalancePartitionsInfo(2 <--- 0, partitions moved - [4] - [3], partitions deleted - [4] - [3], stores: [test]) | |
RebalancePartitionsInfo(1 <--- 0, partitions moved - [] - [] - [2], partitions deleted - [] - [] - [2], stores: [test]) | |
RebalancePartitionsInfo(3 <--- 2, partitions moved - [] - [] - [4], partitions deleted - [] - [] - [], stores: [test]) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package voldemort.client.rebalance; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.HashSet; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Queue; | |
import java.util.Set; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import voldemort.VoldemortException; | |
import voldemort.cluster.Cluster; | |
import voldemort.cluster.Node; | |
import voldemort.store.StoreDefinition; | |
import voldemort.utils.Pair; | |
import voldemort.utils.RebalanceUtils; | |
import voldemort.utils.Utils; | |
import com.google.common.collect.Sets; | |
/** | |
* Compares the current cluster configuration with the target cluster | |
* configuration. <br> | |
* The end result is a map of target node-ids to another map of source node-ids | |
* and partitions desired to be stolen or fetched | |
* | |
*/ | |
public class RebalanceClusterPlan { | |
private final Queue<RebalanceNodePlan> rebalanceTaskQueue; | |
/** | |
* For the "current" cluster, creates a map of node-ids to corresponding | |
* list of [replica,partition] tuples (primary & replicas) | |
*/ | |
private final Map<Integer, Set<Pair<Integer, Integer>>> currentNodeIdToAllPartitionTuples; | |
/** | |
* For the "target" cluster, creates a map of node-ids to corresponding list | |
* of [replica,partition] tuples (primary & replicas) | |
*/ | |
private final Map<Integer, Set<Pair<Integer, Integer>>> targetNodeIdToAllPartitionTuples; | |
/** | |
* A map of all the stealer nodes to their corresponding stolen [replica, | |
* partition] tuples | |
*/ | |
private final Map<Integer, Set<Pair<Integer, Integer>>> stealerNodeIdToStolenPartitionTuples; | |
private final Cluster currentCluster; | |
private final Cluster targetCluster; | |
/** | |
* Compares the currentCluster configuration with the desired | |
* targetConfiguration and builds a map of Target node-id to map of source | |
* node-ids and partitions desired to be stolen/fetched. | |
* | |
* @param currentCluster The current cluster definition | |
* @param targetCluster The target cluster definition | |
* @param storeDefList The list of store definitions to rebalance | |
* @param enabledDeletePartition Delete the RW partition on the donor side | |
* after rebalance | |
*/ | |
public RebalanceClusterPlan(final Cluster currentCluster, | |
final Cluster targetCluster, | |
final List<StoreDefinition> storeDefs, | |
final boolean enabledDeletePartition) { | |
this.currentCluster = currentCluster; | |
this.targetCluster = targetCluster; | |
this.rebalanceTaskQueue = new ConcurrentLinkedQueue<RebalanceNodePlan>(); | |
// Number of partitions to remain same | |
if(currentCluster.getNumberOfPartitions() != targetCluster.getNumberOfPartitions()) | |
throw new VoldemortException("Total number of partitions should be equal [ Current cluster (" | |
+ currentCluster.getNumberOfPartitions() | |
+ ") not equal to Target cluster (" | |
+ targetCluster.getNumberOfPartitions() + ") ]"); | |
// Similarly number of nodes to remain same | |
if(currentCluster.getNumberOfNodes() != targetCluster.getNumberOfNodes()) | |
throw new VoldemortException("Total number of nodes should be equal [ Current cluster (" | |
+ currentCluster.getNumberOfNodes() | |
+ ") not equal to Target cluster (" | |
+ targetCluster.getNumberOfNodes() + ") ]"); | |
// Create node id to all tuples mapping | |
this.currentNodeIdToAllPartitionTuples = Collections.unmodifiableMap(RebalanceUtils.getNodeIdToAllPartitions(currentCluster, | |
storeDefs, | |
true)); | |
this.targetNodeIdToAllPartitionTuples = Collections.unmodifiableMap(RebalanceUtils.getNodeIdToAllPartitions(targetCluster, | |
storeDefs, | |
true)); | |
// Create stealer node to stolen tuples mapping | |
this.stealerNodeIdToStolenPartitionTuples = Collections.unmodifiableMap(RebalanceUtils.getStolenPartitionTuples(currentCluster, | |
targetCluster, | |
storeDefs)); | |
for(Node node: targetCluster.getNodes()) { | |
List<RebalancePartitionsInfo> rebalanceNodeList = getRebalancePartitionsInfo(currentCluster, | |
targetCluster, | |
RebalanceUtils.getStoreNames(storeDefs), | |
node.getId(), | |
enabledDeletePartition); | |
if(rebalanceNodeList.size() > 0) { | |
rebalanceTaskQueue.offer(new RebalanceNodePlan(node.getId(), rebalanceNodeList)); | |
} | |
} | |
} | |
public Queue<RebalanceNodePlan> getRebalancingTaskQueue() { | |
return rebalanceTaskQueue; | |
} | |
/** | |
* Generate the list of partition movement based on 2 principles: | |
* | |
* <ol> | |
* <li>The number of partitions don't change; they are only redistributed | |
* across nodes | |
* <li>A primary or replica partition that is going to be deleted is never | |
* used to copy from data from another stealer | |
* </ol> | |
* | |
* @param currentCluster Current cluster configuration | |
* @param targetCluster Target cluster configuration | |
* @param storeNames List of store names | |
* @param stealerId Id of the stealer node | |
* @param enableDeletePartition To delete or not to delete? | |
*/ | |
private List<RebalancePartitionsInfo> getRebalancePartitionsInfo(final Cluster currentCluster, | |
final Cluster targetCluster, | |
final List<String> storeNames, | |
final int stealerId, | |
final boolean enableDeletePartition) { | |
final List<RebalancePartitionsInfo> result = new ArrayList<RebalancePartitionsInfo>(); | |
// If null, done with this stealer node | |
if(stealerNodeIdToStolenPartitionTuples.get(stealerId) == null) { | |
return result; | |
} | |
final Set<Pair<Integer, Integer>> haveToStealTuples = Sets.newHashSet(stealerNodeIdToStolenPartitionTuples.get(stealerId)); | |
// Now we find out which donor can donate partitions to this stealer | |
for(Node donorNode: currentCluster.getNodes()) { | |
// The same node can't donate | |
if(donorNode.getId() == stealerId) | |
continue; | |
// Finished treating all partitions? | |
if(haveFinishedPartitions(haveToStealTuples)) { | |
break; | |
} | |
final Set<Pair<Integer, Integer>> trackStealPartitionsTuples = new HashSet<Pair<Integer, Integer>>(); | |
final Set<Pair<Integer, Integer>> trackDeletePartitionsTuples = new HashSet<Pair<Integer, Integer>>(); | |
// Checks if this donor node can donate any tuples | |
donatePartitionTuple(donorNode, haveToStealTuples, trackStealPartitionsTuples); | |
// Check if we can delete the partitions this donor just donated | |
donateDeletePartitionTuple(donorNode, | |
trackStealPartitionsTuples, | |
trackDeletePartitionsTuples, | |
enableDeletePartition); | |
if(trackStealPartitionsTuples.size() > 0 || trackDeletePartitionsTuples.size() > 0) { | |
result.add(new RebalancePartitionsInfo(stealerId, | |
donorNode.getId(), | |
RebalanceUtils.flattenPartitionTuples(trackStealPartitionsTuples), | |
RebalanceUtils.flattenPartitionTuples(trackDeletePartitionsTuples), | |
storeNames, | |
currentCluster, | |
0)); | |
} | |
} | |
return result; | |
} | |
/** | |
* Given a donor node and a set of tuples that need to be stolen, checks if | |
* the donor can contribute any | |
* | |
* @param donorNode Donor node we are checking | |
* @param haveToStealPartitions The partition tuples which we ideally want | |
* to steal | |
* @param trackStealPartitions Set of partitions tuples already stolen | |
*/ | |
private void donatePartitionTuple(final Node donorNode, | |
Set<Pair<Integer, Integer>> haveToStealPartitions, | |
Set<Pair<Integer, Integer>> trackStealPartitions) { | |
final Set<Pair<Integer, Integer>> donorPrimaryPartitionIds = currentNodeIdToAllPartitionTuples.get(donorNode.getId()); | |
final Iterator<Pair<Integer, Integer>> iter = haveToStealPartitions.iterator(); | |
// Iterate over the partition tuples to steal and check if this node can | |
// donate it | |
while(iter.hasNext()) { | |
Pair<Integer, Integer> partitionTupleToSteal = iter.next(); | |
if(donorPrimaryPartitionIds.contains(partitionTupleToSteal)) { | |
trackStealPartitions.add(partitionTupleToSteal); | |
// This partition has been donated, remove it | |
iter.remove(); | |
} | |
} | |
} | |
/** | |
* We do not delete if this donor node is also the stealer node for another | |
* plan and is stealing data for the same partition ( irrespective of the | |
* replica type ). This is a problem only for BDB storage engine but has | |
* been placed at this level since for RO stores we don't delete the data | |
* anyways. | |
* | |
* @param donor Donor node | |
* @param trackStealPartitionsTuples Partitions being stolen | |
* @param trackDeletePartitionsTuples Partitions going to be deleted | |
* @param enableDeletePartition Are we allowed to delete? | |
*/ | |
private void donateDeletePartitionTuple(final Node donor, | |
Set<Pair<Integer, Integer>> trackStealPartitionsTuples, | |
Set<Pair<Integer, Integer>> trackDeletePartitionsTuples, | |
boolean enableDeletePartition) { | |
// Are we allowed to delete AND do we have anything to check for | |
// deletion? | |
if(enableDeletePartition && trackStealPartitionsTuples.size() > 0) { | |
// Retrieve information about what this donor node will steal | |
List<Integer> partitionsStolenByDonor = RebalanceUtils.getPartitionsFromTuples(stealerNodeIdToStolenPartitionTuples.get(donor.getId())); | |
// Check if this stolen partitions overlap with those being donated | |
for(Pair<Integer, Integer> tuple: trackStealPartitionsTuples) { | |
if(!partitionsStolenByDonor.contains(tuple.getSecond())) { | |
trackDeletePartitionsTuples.add(tuple); | |
} | |
} | |
} | |
} | |
private boolean haveFinishedPartitions(Set<Pair<Integer, Integer>> set) { | |
return (set == null || set.size() == 0); | |
} | |
@Override | |
public String toString() { | |
if(rebalanceTaskQueue.isEmpty()) { | |
return "No rebalancing required since rebalance task is empty"; | |
} | |
StringBuilder builder = new StringBuilder(); | |
builder.append("Cluster Rebalancing Plan : ").append(Utils.NEWLINE); | |
builder.append(toString(getRebalancingTaskQueue())); | |
return builder.toString(); | |
} | |
public String toString(Queue<RebalanceNodePlan> queue) { | |
if(queue == null || queue.isEmpty()) { | |
return ""; | |
} | |
StringBuilder builder = new StringBuilder(Utils.NEWLINE); | |
for(RebalanceNodePlan nodePlan: queue) { | |
builder.append("StealerNode : " + nodePlan.getStealerNode()).append(Utils.NEWLINE); | |
for(RebalancePartitionsInfo rebalancePartitionsInfo: nodePlan.getRebalanceTaskList()) { | |
builder.append("\t RebalancePartitionsInfo : " + rebalancePartitionsInfo) | |
.append(Utils.NEWLINE); | |
} | |
} | |
return builder.toString(); | |
} | |
public String printPartitionDistribution() { | |
StringBuilder sb = new StringBuilder(); | |
sb.append("Current Cluster: ") | |
.append(Utils.NEWLINE) | |
.append(RebalanceUtils.printMap(currentNodeIdToAllPartitionTuples, currentCluster)) | |
.append(Utils.NEWLINE); | |
sb.append("Target Cluster: ") | |
.append(Utils.NEWLINE) | |
.append(RebalanceUtils.printMap(targetNodeIdToAllPartitionTuples, targetCluster)); | |
return sb.toString(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment