Created
May 4, 2011 20:17
-
-
Save rsumbaly/955944 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void testPartitionPlanWithBdbChanges() { | |
Cluster currentCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 1, 3, 5, 7 }, | |
{ 0, 2, 4, 6 } }); | |
Cluster targetCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 3, 5, 7 }, | |
{ 1, 0, 2, 4, 6 } }); | |
StoreDefinition def = ServerTestUtils.getStoreDef("test", | |
2, | |
1, | |
1, | |
1, | |
1, | |
RoutingStrategyType.CONSISTENT_STRATEGY); | |
RebalanceClusterPlan plan = new RebalanceClusterPlan(currentCluster, | |
targetCluster, | |
Lists.newArrayList(def), | |
true); | |
List<RebalancePartitionsInfo> info = RebalanceUtils.updatePartitionPlanWithBdbChanges(RebalanceUtils.flattenNodePlans(Lists.newArrayList(plan.getRebalancingTaskQueue())), | |
currentCluster, | |
Lists.newArrayList(def)); | |
assertEquals(info.size(), 0); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* If the stealer already contains a replica of the data, why move it? In | |
* case of BDB storage engine this optimization is very crucial as we store | |
* all data of a store on a single node together. Hence we want to avoid the | |
* scenario where-in deletes are on and we're moving a key out and also | |
* streaming it in | |
* | |
* @param existingPlanList Existing partition plan list | |
* @param existingCluster Current cluster metadata | |
* @return Return a list of partition plans modified with the above | |
* optimization | |
*/ | |
public static List<RebalancePartitionsInfo> updatePartitionPlanWithBdbChanges(final List<RebalancePartitionsInfo> existingPlanList, | |
final Cluster existingCluster, | |
final List<StoreDefinition> rwStoreDefs) { | |
Map<Integer, Set<Pair<Integer, Integer>>> nodeIdToAllPartitions = RebalanceUtils.getNodeIdToAllPartitions(existingCluster, | |
rwStoreDefs, | |
true); | |
List<RebalancePartitionsInfo> plans = Lists.newArrayList(); | |
for(RebalancePartitionsInfo existingPlan: existingPlanList) { | |
RebalancePartitionsInfo info = RebalancePartitionsInfo.create(existingPlan.toJsonString()); | |
List<Integer> partitionsOnStealerNode = getPartitionsFromTuples(nodeIdToAllPartitions.get(info.getStealerId())); | |
HashMap<Integer, List<Integer>> newReplicaToPartitionList = Maps.newHashMap(); | |
for(Entry<Integer, List<Integer>> entry: info.getReplicaToPartitionList().entrySet()) { | |
// Check if we have an overlap with the existing partitions on | |
// the stealer node | |
List<Integer> qualifiedPartitions = Lists.newArrayList(); | |
for(int partition: entry.getValue()) { | |
if(!partitionsOnStealerNode.contains(partition)) { | |
qualifiedPartitions.add(partition); | |
} | |
} | |
if(qualifiedPartitions.size() > 0) { | |
newReplicaToPartitionList.put(entry.getKey(), qualifiedPartitions); | |
} | |
} | |
if(newReplicaToPartitionList.size() > 0) { | |
info.setReplicaToPartitionList(newReplicaToPartitionList); | |
plans.add(info); | |
} | |
} | |
return plans; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment