Created
January 6, 2011 02:04
-
-
Save rsumbaly/767390 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package voldemort.client.rebalance; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.Iterator; | |
import java.util.LinkedHashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Queue; | |
import java.util.Set; | |
import java.util.TreeMap; | |
import java.util.TreeSet; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import org.apache.log4j.Logger; | |
import voldemort.VoldemortException; | |
import voldemort.cluster.Cluster; | |
import voldemort.cluster.Node; | |
import voldemort.routing.RoutingStrategy; | |
import voldemort.routing.RoutingStrategyFactory; | |
import voldemort.store.StoreDefinition; | |
import voldemort.utils.RebalanceUtils; | |
import com.google.common.collect.Maps; | |
/** | |
* Compares the currentCluster configuration with the desired | |
* targetConfiguration and returns a map of Target node-id to map of source | |
* node-ids and partitions desired to be stolen/fetched. | |
* | |
*/ | |
public class RebalanceClusterPlan { | |
private final static Logger logger = Logger.getLogger(RebalanceClusterPlan.class); | |
private final static String NL = System.getProperty("line.separator"); | |
private Queue<RebalanceNodePlan> rebalanceTaskQueue; | |
/** | |
* For the "current" cluster, this map contains a list of partitions | |
* (primary & replicas) that each node has. | |
*/ | |
private Map<Integer, Set<Integer>> currentNodeIdToAllPartitions; | |
/** | |
* For the "target" cluster, this map contains a list of partitions (primary | |
* & replicas) that each node will have as a result of re-mapping the | |
* cluster. | |
*/ | |
private Map<Integer, Set<Integer>> targetNodeIdToAllPartitions; | |
/** | |
* As a result of re-mapping the cluster, you will have nodes that will lose | |
* partitions (primaries or replicas). This map contains all partitions | |
* (primary & replicas) that were moved away from the node, in other words, | |
* they were deleted from the node. | |
*/ | |
private Map<Integer, Set<Integer>> targetNodeIdToAllDeletedPartitions; | |
/** | |
* As a result of re-mapping the cluster, you will have nodes that will gain | |
* partitions (primaries or replicas). This map contains all partitions | |
* (primary & replicas) that were gained to the node, in other words, they | |
* were added to the node. | |
*/ | |
private Map<Integer, Set<Integer>> targetNodeIdToAllAddedPartitions; | |
/** | |
* As a result of re-mapping the cluster, some partitions need to be | |
* "deleted" from the node, in other words, the node no longer has a | |
* ownership of this partition (primary or replica). This map keeps track of | |
* the deleted partitions that have being taking care by the logic that | |
* builds the rebalance-plan. | |
*/ | |
private Map<Integer, Set<Integer>> alreadyDeletedNodeIdToPartions = new TreeMap<Integer, Set<Integer>>(); | |
public RebalanceClusterPlan(Cluster currentCluster, | |
Cluster targetCluster, | |
List<StoreDefinition> oldStoreDefList, | |
List<StoreDefinition> newStoreDefList, | |
boolean deleteDonorPartition, | |
Map<Integer, Map<String, String>> currentROStoreVersionsDirs) { | |
initialize(currentCluster, | |
targetCluster, | |
oldStoreDefList, | |
newStoreDefList, | |
deleteDonorPartition, | |
currentROStoreVersionsDirs); | |
} | |
public RebalanceClusterPlan(Cluster currentCluster, | |
Cluster targetCluster, | |
List<StoreDefinition> storeDefList, | |
boolean deleteDonorPartition, | |
Map<Integer, Map<String, String>> currentROStoreVersionsDirs) { | |
initialize(currentCluster, | |
targetCluster, | |
storeDefList, | |
storeDefList, | |
deleteDonorPartition, | |
currentROStoreVersionsDirs); | |
} | |
/** | |
* Compares the currentCluster configuration with the desired | |
* targetConfiguration and builds a map of Target node-id to map of source | |
* node-ids and partitions desired to be stolen/fetched. | |
* | |
* @param currentCluster The current cluster definition | |
* @param targetCluster The target cluster definition | |
* @param currentStoreDefList The original store definitions list | |
* @param targetStoreDefList The new store definition list | |
* @param deleteDonorPartition Delete the RW partition on the donor side | |
* after rebalance | |
* @param currentROStoreVersionsDirs A mapping of nodeId to map of store | |
* names to version ids | |
*/ | |
private void initialize(Cluster currentCluster, | |
Cluster targetCluster, | |
List<StoreDefinition> currentStoreDefList, | |
List<StoreDefinition> targetStoreDefList, | |
boolean deleteDonorPartition, | |
Map<Integer, Map<String, String>> currentROStoreVersionsDirs) { | |
this.rebalanceTaskQueue = new ConcurrentLinkedQueue<RebalanceNodePlan>(); | |
if(currentCluster.getNumberOfPartitions() != targetCluster.getNumberOfPartitions()) | |
throw new VoldemortException("Total number of partitions should not change !!"); | |
if(!RebalanceUtils.hasSameStores(currentStoreDefList, targetStoreDefList)) | |
throw new VoldemortException("Either the number of stores has changed or some stores are missing"); | |
// Create only once the node-to-all-partitions relationship. | |
// all-partitions means primaries and replicas that this node is | |
// responsable for. | |
this.currentNodeIdToAllPartitions = Collections.unmodifiableMap(createNodeIdToAllPartitions(currentCluster, | |
currentStoreDefList)); | |
this.targetNodeIdToAllPartitions = Collections.unmodifiableMap(createNodeIdToAllPartitions(targetCluster, | |
targetStoreDefList)); | |
if(logger.isDebugEnabled()) { | |
logger.debug("Current Cluster: " + NL | |
+ printMap(currentNodeIdToAllPartitions, currentCluster)); | |
logger.debug("Target Cluster: " + NL | |
+ printMap(targetNodeIdToAllPartitions, targetCluster)); | |
} | |
// As a result of re-mapping the cluster, target nodes can lose or gain | |
// partitions. | |
targetNodeIdToAllDeletedPartitions = Collections.unmodifiableMap(createTargetNodeItToAllDeletePartitions()); | |
targetNodeIdToAllAddedPartitions = Collections.unmodifiableMap(createTargetNodeIdToAllAddedPartitions()); | |
if(logger.isDebugEnabled()) { | |
logger.debug("Target deleted partitions: " + targetNodeIdToAllDeletedPartitions); | |
logger.debug("Target stealing partitions: " + targetNodeIdToAllAddedPartitions); | |
} | |
for(Node node: targetCluster.getNodes()) { | |
List<RebalancePartitionsInfo> rebalanceNodeList = createRebalancePartitionsInfo(currentCluster, | |
targetCluster, | |
RebalanceUtils.getStoreNames(currentStoreDefList), | |
node.getId(), | |
deleteDonorPartition); | |
if(rebalanceNodeList.size() > 0) { | |
if(currentROStoreVersionsDirs != null && currentROStoreVersionsDirs.size() > 0) { | |
for(RebalancePartitionsInfo partitionsInfo: rebalanceNodeList) { | |
partitionsInfo.setStealerNodeROStoreToDir(currentROStoreVersionsDirs.get(partitionsInfo.getStealerId())); | |
partitionsInfo.setDonorNodeROStoreToDir(currentROStoreVersionsDirs.get(partitionsInfo.getDonorId())); | |
} | |
} | |
rebalanceTaskQueue.offer(new RebalanceNodePlan(node.getId(), rebalanceNodeList)); | |
} | |
} | |
} | |
public Queue<RebalanceNodePlan> getRebalancingTaskQueue() { | |
return rebalanceTaskQueue; | |
} | |
/** | |
* Returns a map of stealer node to their corresponding node plan | |
* | |
* @return Map of stealer node to plan | |
*/ | |
public HashMap<Integer, RebalanceNodePlan> getRebalancingTaskQueuePerNode() { | |
HashMap<Integer, RebalanceNodePlan> rebalanceMap = Maps.newHashMap(); | |
Iterator<RebalanceNodePlan> iter = rebalanceTaskQueue.iterator(); | |
while(iter.hasNext()) { | |
RebalanceNodePlan plan = iter.next(); | |
rebalanceMap.put(plan.getStealerNode(), plan); | |
} | |
return rebalanceMap; | |
} | |
/** | |
* Target nodes that lost partitions as a result of rebalance. | |
* | |
* @return map to all "target" nodes-to-lost partitions relationship. | |
*/ | |
private Map<Integer, Set<Integer>> createTargetNodeItToAllDeletePartitions() { | |
final Map<Integer, Set<Integer>> map = new TreeMap<Integer, Set<Integer>>(); | |
for(Integer targetNodeId: targetNodeIdToAllPartitions.keySet()) { | |
Set<Integer> clusterAllPartitions = currentNodeIdToAllPartitions.get(targetNodeId); | |
Set<Integer> targetAllPartitions = targetNodeIdToAllPartitions.get(targetNodeId); | |
Set<Integer> deletedPartitions = getDeletedInTarget(clusterAllPartitions, | |
targetAllPartitions); | |
map.put(targetNodeId, deletedPartitions); | |
} | |
return map; | |
} | |
/** | |
* Target nodes that gained partitions as a result of rebalance. | |
* | |
* @return map to all "target" nodes-to-gained partitions relationship. | |
*/ | |
private Map<Integer, Set<Integer>> createTargetNodeIdToAllAddedPartitions() { | |
final Map<Integer, Set<Integer>> map = new TreeMap<Integer, Set<Integer>>(); | |
for(Integer targetNodeId: targetNodeIdToAllPartitions.keySet()) { | |
Set<Integer> clusterAllPartitions = currentNodeIdToAllPartitions.get(targetNodeId); | |
Set<Integer> targetAllPartitions = targetNodeIdToAllPartitions.get(targetNodeId); | |
Set<Integer> addedPartitions = getAddedInTarget(clusterAllPartitions, | |
targetAllPartitions); | |
map.put(targetNodeId, addedPartitions); | |
} | |
return map; | |
} | |
/** | |
* This approach is based on 2 principals: | |
* | |
* <ol> | |
* <li>The number of partitions don't change only they get redistributes | |
* across nodes. | |
* <li>A primary or replica partition that is going to be deleted is never | |
* used to copy from by another stealer. | |
* </ol> | |
* | |
* Principal #2, based on design rebalance can be run in parallel, using | |
* multiple threads. If one thread deletes a partition that another thread | |
* is using to copy data from then a raised condition will be encounter | |
* losing data. To prevent this we follow principal #2 in this method. | |
* | |
*/ | |
private List<RebalancePartitionsInfo> createRebalancePartitionsInfo(final Cluster currentCluster, | |
final Cluster targetCluster, | |
final List<String> storeNames, | |
final int stealerId, | |
final boolean enabledDeletePartition) { | |
final List<RebalancePartitionsInfo> result = new ArrayList<RebalancePartitionsInfo>(); | |
// If this stealer is not stealing any partitions then return. | |
final Set<Integer> stealedPartitions = targetNodeIdToAllAddedPartitions.get(stealerId); | |
if(stealedPartitions == null || stealedPartitions.size() == 0) { | |
return result; | |
} | |
// Separates primaries from replicas this stealer is stealing. | |
final Set<Integer> stealingPrimaries = getStealPrimaries(currentCluster, | |
targetCluster, | |
stealerId); | |
final Set<Integer> stealingReplicas = getStealReplicas(stealedPartitions, stealingPrimaries); | |
if(logger.isDebugEnabled()) { | |
logger.debug("stealer id: " + stealerId + ", stealing primaries: " + stealingPrimaries); | |
logger.debug("stealer id: " + stealerId + ", stealing replicas: " + stealingReplicas); | |
} | |
// Now let's find out which donor can donate partitions to this stealer. | |
for(Node donorNode: currentCluster.getNodes()) { | |
if(donorNode.getId() == stealerId) | |
continue; | |
// If you have treated all partitions (primaries and replicas) that | |
// this | |
// stealer should stole, then you are all done. | |
if(hasAllPartitionTreated(stealingPrimaries) | |
&& hasAllPartitionTreated(stealingReplicas)) { | |
break; | |
} | |
final Set<Integer> trackStealPartitions = new HashSet<Integer>(); | |
final Set<Integer> trackDeletePartitions = new HashSet<Integer>(); | |
final Set<Integer> trackStealMasterPartitions = new HashSet<Integer>(); | |
// Checks if this donor is donating primary partition only. | |
donatePrimary(donorNode, | |
stealingPrimaries, | |
trackStealMasterPartitions, | |
trackStealPartitions, | |
trackDeletePartitions, | |
enabledDeletePartition); | |
// Checks if this donor can donate a replicas. | |
donareReplicas(donorNode, stealingReplicas, trackStealPartitions); | |
// Checks if this donor needs to delete replicas only | |
deleteDonatedPartitions(donorNode, trackDeletePartitions, enabledDeletePartition); | |
if(trackStealPartitions.size() > 0) { | |
result.add(new RebalancePartitionsInfo(stealerId, | |
donorNode.getId(), | |
new ArrayList<Integer>(trackStealPartitions), | |
new ArrayList<Integer>(trackDeletePartitions), | |
new ArrayList<Integer>(trackStealMasterPartitions), | |
storeNames, | |
new HashMap<String, String>(), | |
new HashMap<String, String>(), | |
0)); | |
} | |
} | |
return result; | |
} | |
private void deleteDonatedPartitions(final Node donor, | |
Set<Integer> trackDeletePartitions, | |
boolean enabledDeletePartition) { | |
final int donorId = donor.getId(); | |
final List<Integer> donorPrimaryPartitionIds = donor.getPartitionIds(); | |
if(enabledDeletePartition) { | |
if(targetNodeIdToAllDeletedPartitions.get(donorId) != null | |
&& targetNodeIdToAllDeletedPartitions.get(donorId).size() > 0) { | |
// Gets all delete partition for this donor (Creates a deep copy | |
// first). | |
Set<Integer> delPartitions = new TreeSet<Integer>(targetNodeIdToAllDeletedPartitions.get(donorId)); | |
// How many have you deleted so far? | |
Set<Integer> alreadyDeletedPartitions = alreadyDeletedNodeIdToPartions.get(donorId); | |
// Removes any primary partitions and the ones already deleted. | |
// There is no side effect in deleting an already deleted | |
// partition, | |
// but just for the sake of clarity, let's only delete once a | |
// partition. | |
if(alreadyDeletedPartitions != null) | |
delPartitions.removeAll(alreadyDeletedPartitions); | |
if(donorPrimaryPartitionIds != null) | |
delPartitions.removeAll(donorPrimaryPartitionIds); | |
// add these del partition to the set used in the | |
// RebalancePartitionsIndo. | |
trackDeletePartitions.addAll(delPartitions); | |
addDeletedPartition(alreadyDeletedNodeIdToPartions, donorId, delPartitions); | |
// targetNodeIdToDeletedPartitions.get(donorId).removeAll(delPartitions); | |
} | |
} | |
} | |
private void donareReplicas(final Node donorNode, | |
Set<Integer> stealingOnlyReplicas, | |
Set<Integer> trackStealPartitions) { | |
final int donorId = donorNode.getId(); | |
final Set<Integer> donorAllParitions = currentNodeIdToAllPartitions.get(donorId); | |
final Iterator<Integer> iter = stealingOnlyReplicas.iterator(); | |
while(iter.hasNext()) { | |
Integer stealingReplica = iter.next(); | |
// only give a replica if this replica is not destiny to be deleted. | |
// failing do to that could potentially mean that your are relaying | |
// to copy a partition | |
// that can be deleted in another {@link RebalancePartitionsInfo} | |
// Remember that it's a possibility by design that you could have | |
// parallelism, meaning that | |
// different threads carry on the execution of different | |
// partition-info-plan. | |
// So there is no guarantee which thread will run first. If you | |
// delete a partition that was used | |
// as a copy from then you could end up deleting the partition | |
// first, and never able to copy | |
// the partition after that, in other words it's gone before you get | |
// a change to copy it. | |
if(donorAllParitions.contains(stealingReplica) | |
&& !targetNodeIdToAllDeletedPartitions.get(donorId).contains(stealingReplica)) { | |
trackStealPartitions.add(stealingReplica); | |
// only one node will donate this partition. | |
iter.remove(); | |
} | |
} | |
} | |
private void donatePrimary(final Node donorNode, | |
Set<Integer> stealingOnlyPrimaries, | |
Set<Integer> trackStealMasterPartitions, | |
Set<Integer> trackStealPartitions, | |
Set<Integer> trackDeletePartitions, | |
boolean enabledDeletePartition) { | |
final List<Integer> donorPrimaryPartitionIds = Collections.unmodifiableList(donorNode.getPartitionIds()); | |
final int donorId = donorNode.getId(); | |
final Iterator<Integer> iter = stealingOnlyPrimaries.iterator(); | |
while(iter.hasNext()) { | |
Integer stealingPrimary = iter.next(); | |
if(donorPrimaryPartitionIds.contains(stealingPrimary)) { | |
trackStealMasterPartitions.add(stealingPrimary); | |
trackStealPartitions.add(stealingPrimary); | |
// This slealedPrimary partition has been donated, let's counter | |
// it out. | |
iter.remove(); | |
if(enabledDeletePartition) { | |
// Should we suppose to delete this partition? | |
if(targetNodeIdToAllDeletedPartitions.get(donorId).contains(stealingPrimary)) { | |
trackDeletePartitions.add(stealingPrimary); | |
addDeletedPartition(alreadyDeletedNodeIdToPartions, | |
donorId, | |
stealingPrimary); | |
} | |
} | |
} | |
} | |
} | |
private boolean hasAllPartitionTreated(Set<Integer> partitionToBeTreatedSet) { | |
return (partitionToBeTreatedSet == null || partitionToBeTreatedSet.size() == 0); | |
} | |
private void addDeletedPartition(Map<Integer, Set<Integer>> map, int key, Set<Integer> values) { | |
for(Integer value: values) { | |
addDeletedPartition(map, key, value); | |
} | |
} | |
private void addDeletedPartition(Map<Integer, Set<Integer>> map, int key, Integer value) { | |
if(map.containsKey(key)) { | |
map.get(key).add(value); | |
} else { | |
Set<Integer> set = new TreeSet<Integer>(); | |
set.add(value); | |
map.put(key, set); | |
} | |
} | |
private Set<Integer> getStealReplicas(Set<Integer> stealedPartitions, | |
Set<Integer> stealingPrimaries) { | |
return getDiff(stealedPartitions, stealingPrimaries); | |
} | |
/** | |
* Returns a set of partitions that were added to the target. there are 4 | |
* possible scenarios: | |
* | |
* getAddedInTarget(cluster, null) - nothing was added, returns null. | |
* getAddedInTarget(null, target) - everything in target was added, return | |
* target. getAddedInTarget(null, null) - neither added nor deleted, return | |
* null. getAddedInTarget(cluster, target)) - returns new partition not | |
* found in cluster. | |
* | |
* @param cluster | |
* @param target | |
* @return | |
*/ | |
private Set<Integer> getAddedInTarget(Set<Integer> cluster, Set<Integer> target) { | |
if(cluster == null || target == null) { | |
return target; | |
} | |
return getDiff(target, cluster); | |
} | |
/** | |
* Returns a set of partitions that were deleted to the target. there are 4 | |
* possible scenarios: | |
* | |
* getDeletedInTarget(cluster, null) - everything was deleted, returns | |
* cluster. getDeletedInTarget(null, target) - everything in target was | |
* added, return target. getDeletedInTarget(null, null) - neither added nor | |
* deleted, return null. getDeletedInTarget(cluster, target)) - returns | |
* deleted partition not found in target. | |
* | |
* @param cluster | |
* @param target | |
* @return | |
*/ | |
private Set<Integer> getDeletedInTarget(final Set<Integer> cluster, final Set<Integer> target) { | |
if(cluster == null || target == null) { | |
return cluster; | |
} | |
return getDiff(cluster, target); | |
} | |
private Set<Integer> getDiff(final Set<Integer> source, final Set<Integer> dest) { | |
Set<Integer> diff = new TreeSet<Integer>(); | |
for(Integer id: source) { | |
if(!dest.contains(id)) { | |
diff.add(id); | |
} | |
} | |
return diff; | |
} | |
/** | |
* This is a very useful method that returns a string representation of the | |
* cluster The idea of this method is to expose clearly the Node and its | |
* primary & replicas partitions in the ollowing format: | |
* | |
* <pre> | |
* | |
* Current Cluster: | |
* 0 - [0, 1, 2, 3] + [7, 8, 9] | |
* 1 - [4, 5, 6] + [0, 1, 2, 3] | |
* 2 - [7, 8, 9] + [4, 5, 6] | |
* | |
* </pre> | |
* | |
* @param nodeItToAllPartitions | |
* @param cluster | |
* @return string representation of the cluster, indicating node,primary and | |
* replica relationship. | |
* | |
*/ | |
private String printMap(final Map<Integer, Set<Integer>> nodeItToAllPartitions, | |
final Cluster cluster) { | |
StringBuilder sb = new StringBuilder(); | |
for(Map.Entry<Integer, Set<Integer>> entry: nodeItToAllPartitions.entrySet()) { | |
final Integer nodeId = entry.getKey(); | |
final Set<Integer> primariesAndReplicas = entry.getValue(); | |
final List<Integer> primaries = cluster.getNodeById(nodeId).getPartitionIds(); | |
Set<Integer> onlyPrimaries = new TreeSet<Integer>(); | |
Set<Integer> onlyReplicas = new TreeSet<Integer>(); | |
for(Integer allPartition: primariesAndReplicas) { | |
if(primaries.contains(allPartition)) { | |
onlyPrimaries.add(allPartition); | |
} else { | |
onlyReplicas.add(allPartition); | |
} | |
} | |
sb.append(nodeId + " - " + onlyPrimaries + " + " + onlyReplicas).append(NL); | |
} | |
return sb.toString(); | |
} | |
private Map<Integer, Set<Integer>> createNodeIdToAllPartitions(final Cluster cluster, | |
List<StoreDefinition> storeDef) { | |
final Collection<Node> nodes = cluster.getNodes(); | |
final StoreDefinition maxReplicationStore = RebalanceUtils.getMaxReplicationStore(storeDef); | |
final RoutingStrategy routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(maxReplicationStore, | |
cluster); | |
final Map<Integer, Set<Integer>> nodeIdToAllPartitions = new HashMap<Integer, Set<Integer>>(); | |
final Map<Integer, Integer> partitionToNodeIdMap = getPartitionToNode(nodes); | |
// Map initialization. | |
for(Node node: nodes) { | |
nodeIdToAllPartitions.put(node.getId(), new TreeSet<Integer>()); | |
} | |
// Loops through all nodes | |
for(Node node: nodes) { | |
// Gets the partitions that this node was configured with. | |
for(Integer primary: node.getPartitionIds()) { | |
// Gets the list of replicating partitions. | |
List<Integer> replicaPartitionList = routingStrategy.getReplicatingPartitionList(primary); | |
// Get the node that this replicating partition belongs to. | |
for(Integer replicaPartition: replicaPartitionList) { | |
Integer replicaNodeId = partitionToNodeIdMap.get(replicaPartition); | |
// The replicating node will have a copy of primary. | |
nodeIdToAllPartitions.get(replicaNodeId).add(primary); | |
} | |
} | |
} | |
return nodeIdToAllPartitions; | |
} | |
/** | |
* @return Reverse lookup from partitionId-to-NodeID. This is a Map of the | |
* information found in cluster.xml. | |
*/ | |
private Map<Integer, Integer> getPartitionToNode(final Collection<Node> nodes) { | |
final Map<Integer, Integer> map = new LinkedHashMap<Integer, Integer>(); | |
// For each node in the cluster, get its partitions (shards in VShards | |
// terminology) | |
for(Node node: nodes) { | |
for(Integer partitionId: node.getPartitionIds()) { | |
// Make sure that this same partition was NOT configured in | |
// another | |
// Node. If so then throw an exception and let me know | |
// the Nodes that shared this bad partition id. | |
Integer previousRegisteredNodeId = map.get(partitionId); | |
if(previousRegisteredNodeId != null) { | |
throw new IllegalArgumentException("Partition id " + partitionId | |
+ " found in 2 nodes ID's: " + node.getId() | |
+ " and " + previousRegisteredNodeId); | |
} | |
map.put(partitionId, node.getId()); | |
} | |
} | |
return map; | |
} | |
/** | |
* For a particular stealer node find all the partitions it will steal | |
* | |
* @param currentCluster The cluster definition of the existing cluster | |
* @param targetCluster The target cluster definition | |
* @param stealNodeId The id of the stealer node | |
* @return Returns a list of partitions which this stealer node will get | |
*/ | |
private Set<Integer> getStealPrimaries(Cluster currentCluster, | |
Cluster targetCluster, | |
int stealNodeId) { | |
List<Integer> targetList = new ArrayList<Integer>(targetCluster.getNodeById(stealNodeId) | |
.getPartitionIds()); | |
List<Integer> currentList = new ArrayList<Integer>(); | |
if(RebalanceUtils.containsNode(currentCluster, stealNodeId)) | |
currentList = currentCluster.getNodeById(stealNodeId).getPartitionIds(); | |
// remove all current partitions from targetList | |
targetList.removeAll(currentList); | |
return new TreeSet<Integer>(targetList); | |
} | |
@Override | |
public String toString() { | |
if(rebalanceTaskQueue.isEmpty()) { | |
return "Cluster is already balanced, No rebalancing needed"; | |
} | |
StringBuilder builder = new StringBuilder(); | |
builder.append("Cluster Rebalancing Plan:").append(NL); | |
builder.append(toString(getRebalancingTaskQueue())); | |
return builder.toString(); | |
} | |
public String toString(Queue<RebalanceNodePlan> queue) { | |
if(queue == null || queue.isEmpty()) { | |
return ""; | |
} | |
StringBuilder builder = new StringBuilder(NL); | |
for(RebalanceNodePlan nodePlan: queue) { | |
builder.append("StealerNode:" + nodePlan.getStealerNode()).append(NL); | |
for(RebalancePartitionsInfo rebalancePartitionsInfo: nodePlan.getRebalanceTaskList()) { | |
builder.append("\t RebalancePartitionsInfo: " + rebalancePartitionsInfo).append(NL); | |
builder.append("\t\t getStealMasterPartitions(): " | |
+ rebalancePartitionsInfo.getStealMasterPartitions()).append(NL); | |
builder.append("\t\t getPartitionList(): " | |
+ rebalancePartitionsInfo.getPartitionList()).append(NL); | |
builder.append("\t\t getDeletePartitionsList(): " | |
+ rebalancePartitionsInfo.getDeletePartitionsList()).append(NL); | |
builder.append("\t\t getUnbalancedStoreList(): " | |
+ rebalancePartitionsInfo.getUnbalancedStoreList()) | |
.append(NL) | |
.append(NL); | |
} | |
} | |
return builder.toString(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment