Skip to content

Instantly share code, notes, and snippets.

@rsumbaly
Created January 6, 2011 02:04
Show Gist options
  • Save rsumbaly/767390 to your computer and use it in GitHub Desktop.
Save rsumbaly/767390 to your computer and use it in GitHub Desktop.
package voldemort.client.rebalance;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.store.StoreDefinition;
import voldemort.utils.RebalanceUtils;
import com.google.common.collect.Maps;
/**
* Compares the currentCluster configuration with the desired
* targetConfiguration and returns a map of Target node-id to map of source
* node-ids and partitions desired to be stolen/fetched.
*
*/
public class RebalanceClusterPlan {
private final static Logger logger = Logger.getLogger(RebalanceClusterPlan.class);
private final static String NL = System.getProperty("line.separator");
private Queue<RebalanceNodePlan> rebalanceTaskQueue;
/**
* For the "current" cluster, this map contains a list of partitions
* (primary & replicas) that each node has.
*/
private Map<Integer, Set<Integer>> currentNodeIdToAllPartitions;
/**
* For the "target" cluster, this map contains a list of partitions (primary
* & replicas) that each node will have as a result of re-mapping the
* cluster.
*/
private Map<Integer, Set<Integer>> targetNodeIdToAllPartitions;
/**
* As a result of re-mapping the cluster, you will have nodes that will lose
* partitions (primaries or replicas). This map contains all partitions
* (primary & replicas) that were moved away from the node, in other words,
* they were deleted from the node.
*/
private Map<Integer, Set<Integer>> targetNodeIdToAllDeletedPartitions;
/**
* As a result of re-mapping the cluster, you will have nodes that will gain
* partitions (primaries or replicas). This map contains all partitions
* (primary & replicas) that were gained to the node, in other words, they
* were added to the node.
*/
private Map<Integer, Set<Integer>> targetNodeIdToAllAddedPartitions;
/**
* As a result of re-mapping the cluster, some partitions need to be
* "deleted" from the node, in other words, the node no longer has a
* ownership of this partition (primary or replica). This map keeps track of
* the deleted partitions that have being taking care by the logic that
* builds the rebalance-plan.
*/
private Map<Integer, Set<Integer>> alreadyDeletedNodeIdToPartions = new TreeMap<Integer, Set<Integer>>();
public RebalanceClusterPlan(Cluster currentCluster,
Cluster targetCluster,
List<StoreDefinition> oldStoreDefList,
List<StoreDefinition> newStoreDefList,
boolean deleteDonorPartition,
Map<Integer, Map<String, String>> currentROStoreVersionsDirs) {
initialize(currentCluster,
targetCluster,
oldStoreDefList,
newStoreDefList,
deleteDonorPartition,
currentROStoreVersionsDirs);
}
public RebalanceClusterPlan(Cluster currentCluster,
Cluster targetCluster,
List<StoreDefinition> storeDefList,
boolean deleteDonorPartition,
Map<Integer, Map<String, String>> currentROStoreVersionsDirs) {
initialize(currentCluster,
targetCluster,
storeDefList,
storeDefList,
deleteDonorPartition,
currentROStoreVersionsDirs);
}
/**
* Compares the currentCluster configuration with the desired
* targetConfiguration and builds a map of Target node-id to map of source
* node-ids and partitions desired to be stolen/fetched.
*
* @param currentCluster The current cluster definition
* @param targetCluster The target cluster definition
* @param currentStoreDefList The original store definitions list
* @param targetStoreDefList The new store definition list
* @param deleteDonorPartition Delete the RW partition on the donor side
* after rebalance
* @param currentROStoreVersionsDirs A mapping of nodeId to map of store
* names to version ids
*/
private void initialize(Cluster currentCluster,
Cluster targetCluster,
List<StoreDefinition> currentStoreDefList,
List<StoreDefinition> targetStoreDefList,
boolean deleteDonorPartition,
Map<Integer, Map<String, String>> currentROStoreVersionsDirs) {
this.rebalanceTaskQueue = new ConcurrentLinkedQueue<RebalanceNodePlan>();
if(currentCluster.getNumberOfPartitions() != targetCluster.getNumberOfPartitions())
throw new VoldemortException("Total number of partitions should not change !!");
if(!RebalanceUtils.hasSameStores(currentStoreDefList, targetStoreDefList))
throw new VoldemortException("Either the number of stores has changed or some stores are missing");
// Create only once the node-to-all-partitions relationship.
// all-partitions means primaries and replicas that this node is
// responsable for.
this.currentNodeIdToAllPartitions = Collections.unmodifiableMap(createNodeIdToAllPartitions(currentCluster,
currentStoreDefList));
this.targetNodeIdToAllPartitions = Collections.unmodifiableMap(createNodeIdToAllPartitions(targetCluster,
targetStoreDefList));
if(logger.isDebugEnabled()) {
logger.debug("Current Cluster: " + NL
+ printMap(currentNodeIdToAllPartitions, currentCluster));
logger.debug("Target Cluster: " + NL
+ printMap(targetNodeIdToAllPartitions, targetCluster));
}
// As a result of re-mapping the cluster, target nodes can lose or gain
// partitions.
targetNodeIdToAllDeletedPartitions = Collections.unmodifiableMap(createTargetNodeItToAllDeletePartitions());
targetNodeIdToAllAddedPartitions = Collections.unmodifiableMap(createTargetNodeIdToAllAddedPartitions());
if(logger.isDebugEnabled()) {
logger.debug("Target deleted partitions: " + targetNodeIdToAllDeletedPartitions);
logger.debug("Target stealing partitions: " + targetNodeIdToAllAddedPartitions);
}
for(Node node: targetCluster.getNodes()) {
List<RebalancePartitionsInfo> rebalanceNodeList = createRebalancePartitionsInfo(currentCluster,
targetCluster,
RebalanceUtils.getStoreNames(currentStoreDefList),
node.getId(),
deleteDonorPartition);
if(rebalanceNodeList.size() > 0) {
if(currentROStoreVersionsDirs != null && currentROStoreVersionsDirs.size() > 0) {
for(RebalancePartitionsInfo partitionsInfo: rebalanceNodeList) {
partitionsInfo.setStealerNodeROStoreToDir(currentROStoreVersionsDirs.get(partitionsInfo.getStealerId()));
partitionsInfo.setDonorNodeROStoreToDir(currentROStoreVersionsDirs.get(partitionsInfo.getDonorId()));
}
}
rebalanceTaskQueue.offer(new RebalanceNodePlan(node.getId(), rebalanceNodeList));
}
}
}
public Queue<RebalanceNodePlan> getRebalancingTaskQueue() {
return rebalanceTaskQueue;
}
/**
* Returns a map of stealer node to their corresponding node plan
*
* @return Map of stealer node to plan
*/
public HashMap<Integer, RebalanceNodePlan> getRebalancingTaskQueuePerNode() {
HashMap<Integer, RebalanceNodePlan> rebalanceMap = Maps.newHashMap();
Iterator<RebalanceNodePlan> iter = rebalanceTaskQueue.iterator();
while(iter.hasNext()) {
RebalanceNodePlan plan = iter.next();
rebalanceMap.put(plan.getStealerNode(), plan);
}
return rebalanceMap;
}
/**
* Target nodes that lost partitions as a result of rebalance.
*
* @return map to all "target" nodes-to-lost partitions relationship.
*/
private Map<Integer, Set<Integer>> createTargetNodeItToAllDeletePartitions() {
final Map<Integer, Set<Integer>> map = new TreeMap<Integer, Set<Integer>>();
for(Integer targetNodeId: targetNodeIdToAllPartitions.keySet()) {
Set<Integer> clusterAllPartitions = currentNodeIdToAllPartitions.get(targetNodeId);
Set<Integer> targetAllPartitions = targetNodeIdToAllPartitions.get(targetNodeId);
Set<Integer> deletedPartitions = getDeletedInTarget(clusterAllPartitions,
targetAllPartitions);
map.put(targetNodeId, deletedPartitions);
}
return map;
}
/**
* Target nodes that gained partitions as a result of rebalance.
*
* @return map to all "target" nodes-to-gained partitions relationship.
*/
private Map<Integer, Set<Integer>> createTargetNodeIdToAllAddedPartitions() {
final Map<Integer, Set<Integer>> map = new TreeMap<Integer, Set<Integer>>();
for(Integer targetNodeId: targetNodeIdToAllPartitions.keySet()) {
Set<Integer> clusterAllPartitions = currentNodeIdToAllPartitions.get(targetNodeId);
Set<Integer> targetAllPartitions = targetNodeIdToAllPartitions.get(targetNodeId);
Set<Integer> addedPartitions = getAddedInTarget(clusterAllPartitions,
targetAllPartitions);
map.put(targetNodeId, addedPartitions);
}
return map;
}
/**
* This approach is based on 2 principals:
*
* <ol>
* <li>The number of partitions don't change only they get redistributes
* across nodes.
* <li>A primary or replica partition that is going to be deleted is never
* used to copy from by another stealer.
* </ol>
*
* Principal #2, based on design rebalance can be run in parallel, using
* multiple threads. If one thread deletes a partition that another thread
* is using to copy data from then a raised condition will be encounter
* losing data. To prevent this we follow principal #2 in this method.
*
*/
private List<RebalancePartitionsInfo> createRebalancePartitionsInfo(final Cluster currentCluster,
final Cluster targetCluster,
final List<String> storeNames,
final int stealerId,
final boolean enabledDeletePartition) {
final List<RebalancePartitionsInfo> result = new ArrayList<RebalancePartitionsInfo>();
// If this stealer is not stealing any partitions then return.
final Set<Integer> stealedPartitions = targetNodeIdToAllAddedPartitions.get(stealerId);
if(stealedPartitions == null || stealedPartitions.size() == 0) {
return result;
}
// Separates primaries from replicas this stealer is stealing.
final Set<Integer> stealingPrimaries = getStealPrimaries(currentCluster,
targetCluster,
stealerId);
final Set<Integer> stealingReplicas = getStealReplicas(stealedPartitions, stealingPrimaries);
if(logger.isDebugEnabled()) {
logger.debug("stealer id: " + stealerId + ", stealing primaries: " + stealingPrimaries);
logger.debug("stealer id: " + stealerId + ", stealing replicas: " + stealingReplicas);
}
// Now let's find out which donor can donate partitions to this stealer.
for(Node donorNode: currentCluster.getNodes()) {
if(donorNode.getId() == stealerId)
continue;
// If you have treated all partitions (primaries and replicas) that
// this
// stealer should stole, then you are all done.
if(hasAllPartitionTreated(stealingPrimaries)
&& hasAllPartitionTreated(stealingReplicas)) {
break;
}
final Set<Integer> trackStealPartitions = new HashSet<Integer>();
final Set<Integer> trackDeletePartitions = new HashSet<Integer>();
final Set<Integer> trackStealMasterPartitions = new HashSet<Integer>();
// Checks if this donor is donating primary partition only.
donatePrimary(donorNode,
stealingPrimaries,
trackStealMasterPartitions,
trackStealPartitions,
trackDeletePartitions,
enabledDeletePartition);
// Checks if this donor can donate a replicas.
donareReplicas(donorNode, stealingReplicas, trackStealPartitions);
// Checks if this donor needs to delete replicas only
deleteDonatedPartitions(donorNode, trackDeletePartitions, enabledDeletePartition);
if(trackStealPartitions.size() > 0) {
result.add(new RebalancePartitionsInfo(stealerId,
donorNode.getId(),
new ArrayList<Integer>(trackStealPartitions),
new ArrayList<Integer>(trackDeletePartitions),
new ArrayList<Integer>(trackStealMasterPartitions),
storeNames,
new HashMap<String, String>(),
new HashMap<String, String>(),
0));
}
}
return result;
}
private void deleteDonatedPartitions(final Node donor,
Set<Integer> trackDeletePartitions,
boolean enabledDeletePartition) {
final int donorId = donor.getId();
final List<Integer> donorPrimaryPartitionIds = donor.getPartitionIds();
if(enabledDeletePartition) {
if(targetNodeIdToAllDeletedPartitions.get(donorId) != null
&& targetNodeIdToAllDeletedPartitions.get(donorId).size() > 0) {
// Gets all delete partition for this donor (Creates a deep copy
// first).
Set<Integer> delPartitions = new TreeSet<Integer>(targetNodeIdToAllDeletedPartitions.get(donorId));
// How many have you deleted so far?
Set<Integer> alreadyDeletedPartitions = alreadyDeletedNodeIdToPartions.get(donorId);
// Removes any primary partitions and the ones already deleted.
// There is no side effect in deleting an already deleted
// partition,
// but just for the sake of clarity, let's only delete once a
// partition.
if(alreadyDeletedPartitions != null)
delPartitions.removeAll(alreadyDeletedPartitions);
if(donorPrimaryPartitionIds != null)
delPartitions.removeAll(donorPrimaryPartitionIds);
// add these del partition to the set used in the
// RebalancePartitionsIndo.
trackDeletePartitions.addAll(delPartitions);
addDeletedPartition(alreadyDeletedNodeIdToPartions, donorId, delPartitions);
// targetNodeIdToDeletedPartitions.get(donorId).removeAll(delPartitions);
}
}
}
private void donareReplicas(final Node donorNode,
Set<Integer> stealingOnlyReplicas,
Set<Integer> trackStealPartitions) {
final int donorId = donorNode.getId();
final Set<Integer> donorAllParitions = currentNodeIdToAllPartitions.get(donorId);
final Iterator<Integer> iter = stealingOnlyReplicas.iterator();
while(iter.hasNext()) {
Integer stealingReplica = iter.next();
// only give a replica if this replica is not destiny to be deleted.
// failing do to that could potentially mean that your are relaying
// to copy a partition
// that can be deleted in another {@link RebalancePartitionsInfo}
// Remember that it's a possibility by design that you could have
// parallelism, meaning that
// different threads carry on the execution of different
// partition-info-plan.
// So there is no guarantee which thread will run first. If you
// delete a partition that was used
// as a copy from then you could end up deleting the partition
// first, and never able to copy
// the partition after that, in other words it's gone before you get
// a change to copy it.
if(donorAllParitions.contains(stealingReplica)
&& !targetNodeIdToAllDeletedPartitions.get(donorId).contains(stealingReplica)) {
trackStealPartitions.add(stealingReplica);
// only one node will donate this partition.
iter.remove();
}
}
}
private void donatePrimary(final Node donorNode,
Set<Integer> stealingOnlyPrimaries,
Set<Integer> trackStealMasterPartitions,
Set<Integer> trackStealPartitions,
Set<Integer> trackDeletePartitions,
boolean enabledDeletePartition) {
final List<Integer> donorPrimaryPartitionIds = Collections.unmodifiableList(donorNode.getPartitionIds());
final int donorId = donorNode.getId();
final Iterator<Integer> iter = stealingOnlyPrimaries.iterator();
while(iter.hasNext()) {
Integer stealingPrimary = iter.next();
if(donorPrimaryPartitionIds.contains(stealingPrimary)) {
trackStealMasterPartitions.add(stealingPrimary);
trackStealPartitions.add(stealingPrimary);
// This slealedPrimary partition has been donated, let's counter
// it out.
iter.remove();
if(enabledDeletePartition) {
// Should we suppose to delete this partition?
if(targetNodeIdToAllDeletedPartitions.get(donorId).contains(stealingPrimary)) {
trackDeletePartitions.add(stealingPrimary);
addDeletedPartition(alreadyDeletedNodeIdToPartions,
donorId,
stealingPrimary);
}
}
}
}
}
private boolean hasAllPartitionTreated(Set<Integer> partitionToBeTreatedSet) {
return (partitionToBeTreatedSet == null || partitionToBeTreatedSet.size() == 0);
}
private void addDeletedPartition(Map<Integer, Set<Integer>> map, int key, Set<Integer> values) {
for(Integer value: values) {
addDeletedPartition(map, key, value);
}
}
private void addDeletedPartition(Map<Integer, Set<Integer>> map, int key, Integer value) {
if(map.containsKey(key)) {
map.get(key).add(value);
} else {
Set<Integer> set = new TreeSet<Integer>();
set.add(value);
map.put(key, set);
}
}
private Set<Integer> getStealReplicas(Set<Integer> stealedPartitions,
Set<Integer> stealingPrimaries) {
return getDiff(stealedPartitions, stealingPrimaries);
}
/**
* Returns a set of partitions that were added to the target. there are 4
* possible scenarios:
*
* getAddedInTarget(cluster, null) - nothing was added, returns null.
* getAddedInTarget(null, target) - everything in target was added, return
* target. getAddedInTarget(null, null) - neither added nor deleted, return
* null. getAddedInTarget(cluster, target)) - returns new partition not
* found in cluster.
*
* @param cluster
* @param target
* @return
*/
private Set<Integer> getAddedInTarget(Set<Integer> cluster, Set<Integer> target) {
if(cluster == null || target == null) {
return target;
}
return getDiff(target, cluster);
}
/**
* Returns a set of partitions that were deleted to the target. there are 4
* possible scenarios:
*
* getDeletedInTarget(cluster, null) - everything was deleted, returns
* cluster. getDeletedInTarget(null, target) - everything in target was
* added, return target. getDeletedInTarget(null, null) - neither added nor
* deleted, return null. getDeletedInTarget(cluster, target)) - returns
* deleted partition not found in target.
*
* @param cluster
* @param target
* @return
*/
private Set<Integer> getDeletedInTarget(final Set<Integer> cluster, final Set<Integer> target) {
if(cluster == null || target == null) {
return cluster;
}
return getDiff(cluster, target);
}
private Set<Integer> getDiff(final Set<Integer> source, final Set<Integer> dest) {
Set<Integer> diff = new TreeSet<Integer>();
for(Integer id: source) {
if(!dest.contains(id)) {
diff.add(id);
}
}
return diff;
}
/**
* This is a very useful method that returns a string representation of the
* cluster The idea of this method is to expose clearly the Node and its
* primary & replicas partitions in the ollowing format:
*
* <pre>
*
* Current Cluster:
* 0 - [0, 1, 2, 3] + [7, 8, 9]
* 1 - [4, 5, 6] + [0, 1, 2, 3]
* 2 - [7, 8, 9] + [4, 5, 6]
*
* </pre>
*
* @param nodeItToAllPartitions
* @param cluster
* @return string representation of the cluster, indicating node,primary and
* replica relationship.
*
*/
private String printMap(final Map<Integer, Set<Integer>> nodeItToAllPartitions,
final Cluster cluster) {
StringBuilder sb = new StringBuilder();
for(Map.Entry<Integer, Set<Integer>> entry: nodeItToAllPartitions.entrySet()) {
final Integer nodeId = entry.getKey();
final Set<Integer> primariesAndReplicas = entry.getValue();
final List<Integer> primaries = cluster.getNodeById(nodeId).getPartitionIds();
Set<Integer> onlyPrimaries = new TreeSet<Integer>();
Set<Integer> onlyReplicas = new TreeSet<Integer>();
for(Integer allPartition: primariesAndReplicas) {
if(primaries.contains(allPartition)) {
onlyPrimaries.add(allPartition);
} else {
onlyReplicas.add(allPartition);
}
}
sb.append(nodeId + " - " + onlyPrimaries + " + " + onlyReplicas).append(NL);
}
return sb.toString();
}
private Map<Integer, Set<Integer>> createNodeIdToAllPartitions(final Cluster cluster,
List<StoreDefinition> storeDef) {
final Collection<Node> nodes = cluster.getNodes();
final StoreDefinition maxReplicationStore = RebalanceUtils.getMaxReplicationStore(storeDef);
final RoutingStrategy routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(maxReplicationStore,
cluster);
final Map<Integer, Set<Integer>> nodeIdToAllPartitions = new HashMap<Integer, Set<Integer>>();
final Map<Integer, Integer> partitionToNodeIdMap = getPartitionToNode(nodes);
// Map initialization.
for(Node node: nodes) {
nodeIdToAllPartitions.put(node.getId(), new TreeSet<Integer>());
}
// Loops through all nodes
for(Node node: nodes) {
// Gets the partitions that this node was configured with.
for(Integer primary: node.getPartitionIds()) {
// Gets the list of replicating partitions.
List<Integer> replicaPartitionList = routingStrategy.getReplicatingPartitionList(primary);
// Get the node that this replicating partition belongs to.
for(Integer replicaPartition: replicaPartitionList) {
Integer replicaNodeId = partitionToNodeIdMap.get(replicaPartition);
// The replicating node will have a copy of primary.
nodeIdToAllPartitions.get(replicaNodeId).add(primary);
}
}
}
return nodeIdToAllPartitions;
}
/**
* @return Reverse lookup from partitionId-to-NodeID. This is a Map of the
* information found in cluster.xml.
*/
private Map<Integer, Integer> getPartitionToNode(final Collection<Node> nodes) {
final Map<Integer, Integer> map = new LinkedHashMap<Integer, Integer>();
// For each node in the cluster, get its partitions (shards in VShards
// terminology)
for(Node node: nodes) {
for(Integer partitionId: node.getPartitionIds()) {
// Make sure that this same partition was NOT configured in
// another
// Node. If so then throw an exception and let me know
// the Nodes that shared this bad partition id.
Integer previousRegisteredNodeId = map.get(partitionId);
if(previousRegisteredNodeId != null) {
throw new IllegalArgumentException("Partition id " + partitionId
+ " found in 2 nodes ID's: " + node.getId()
+ " and " + previousRegisteredNodeId);
}
map.put(partitionId, node.getId());
}
}
return map;
}
/**
* For a particular stealer node find all the partitions it will steal
*
* @param currentCluster The cluster definition of the existing cluster
* @param targetCluster The target cluster definition
* @param stealNodeId The id of the stealer node
* @return Returns a list of partitions which this stealer node will get
*/
private Set<Integer> getStealPrimaries(Cluster currentCluster,
Cluster targetCluster,
int stealNodeId) {
List<Integer> targetList = new ArrayList<Integer>(targetCluster.getNodeById(stealNodeId)
.getPartitionIds());
List<Integer> currentList = new ArrayList<Integer>();
if(RebalanceUtils.containsNode(currentCluster, stealNodeId))
currentList = currentCluster.getNodeById(stealNodeId).getPartitionIds();
// remove all current partitions from targetList
targetList.removeAll(currentList);
return new TreeSet<Integer>(targetList);
}
@Override
public String toString() {
if(rebalanceTaskQueue.isEmpty()) {
return "Cluster is already balanced, No rebalancing needed";
}
StringBuilder builder = new StringBuilder();
builder.append("Cluster Rebalancing Plan:").append(NL);
builder.append(toString(getRebalancingTaskQueue()));
return builder.toString();
}
public String toString(Queue<RebalanceNodePlan> queue) {
if(queue == null || queue.isEmpty()) {
return "";
}
StringBuilder builder = new StringBuilder(NL);
for(RebalanceNodePlan nodePlan: queue) {
builder.append("StealerNode:" + nodePlan.getStealerNode()).append(NL);
for(RebalancePartitionsInfo rebalancePartitionsInfo: nodePlan.getRebalanceTaskList()) {
builder.append("\t RebalancePartitionsInfo: " + rebalancePartitionsInfo).append(NL);
builder.append("\t\t getStealMasterPartitions(): "
+ rebalancePartitionsInfo.getStealMasterPartitions()).append(NL);
builder.append("\t\t getPartitionList(): "
+ rebalancePartitionsInfo.getPartitionList()).append(NL);
builder.append("\t\t getDeletePartitionsList(): "
+ rebalancePartitionsInfo.getDeletePartitionsList()).append(NL);
builder.append("\t\t getUnbalancedStoreList(): "
+ rebalancePartitionsInfo.getUnbalancedStoreList())
.append(NL)
.append(NL);
}
}
return builder.toString();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment