Skip to content

Instantly share code, notes, and snippets.

@rsumbaly
Created May 4, 2011 20:17
Show Gist options
  • Save rsumbaly/955944 to your computer and use it in GitHub Desktop.
Save rsumbaly/955944 to your computer and use it in GitHub Desktop.
public void testPartitionPlanWithBdbChanges() {
Cluster currentCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 1, 3, 5, 7 },
{ 0, 2, 4, 6 } });
Cluster targetCluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 3, 5, 7 },
{ 1, 0, 2, 4, 6 } });
StoreDefinition def = ServerTestUtils.getStoreDef("test",
2,
1,
1,
1,
1,
RoutingStrategyType.CONSISTENT_STRATEGY);
RebalanceClusterPlan plan = new RebalanceClusterPlan(currentCluster,
targetCluster,
Lists.newArrayList(def),
true);
List<RebalancePartitionsInfo> info = RebalanceUtils.updatePartitionPlanWithBdbChanges(RebalanceUtils.flattenNodePlans(Lists.newArrayList(plan.getRebalancingTaskQueue())),
currentCluster,
Lists.newArrayList(def));
assertEquals(info.size(), 0);
}
/**
* If the stealer already contains a replica of the data, why move it? In
* case of BDB storage engine this optimization is very crucial as we store
* all data of a store on a single node together. Hence we want to avoid the
* scenario where-in deletes are on and we're moving a key out and also
* streaming it in
*
* @param existingPlanList Existing partition plan list
* @param existingCluster Current cluster metadata
* @return Return a list of partition plans modified with the above
* optimization
*/
public static List<RebalancePartitionsInfo> updatePartitionPlanWithBdbChanges(final List<RebalancePartitionsInfo> existingPlanList,
final Cluster existingCluster,
final List<StoreDefinition> rwStoreDefs) {
Map<Integer, Set<Pair<Integer, Integer>>> nodeIdToAllPartitions = RebalanceUtils.getNodeIdToAllPartitions(existingCluster,
rwStoreDefs,
true);
List<RebalancePartitionsInfo> plans = Lists.newArrayList();
for(RebalancePartitionsInfo existingPlan: existingPlanList) {
RebalancePartitionsInfo info = RebalancePartitionsInfo.create(existingPlan.toJsonString());
List<Integer> partitionsOnStealerNode = getPartitionsFromTuples(nodeIdToAllPartitions.get(info.getStealerId()));
HashMap<Integer, List<Integer>> newReplicaToPartitionList = Maps.newHashMap();
for(Entry<Integer, List<Integer>> entry: info.getReplicaToPartitionList().entrySet()) {
// Check if we have an overlap with the existing partitions on
// the stealer node
List<Integer> qualifiedPartitions = Lists.newArrayList();
for(int partition: entry.getValue()) {
if(!partitionsOnStealerNode.contains(partition)) {
qualifiedPartitions.add(partition);
}
}
if(qualifiedPartitions.size() > 0) {
newReplicaToPartitionList.put(entry.getKey(), qualifiedPartitions);
}
}
if(newReplicaToPartitionList.size() > 0) {
info.setReplicaToPartitionList(newReplicaToPartitionList);
plans.add(info);
}
}
return plans;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment