Skip to content

Instantly share code, notes, and snippets.

@rsumbaly
Created May 4, 2011 17:11
Show Gist options
  • Save rsumbaly/955590 to your computer and use it in GitHub Desktop.
Save rsumbaly/955590 to your computer and use it in GitHub Desktop.
Optimization for BDB!
@Test
public void testRebalanceNodeRWStore() {
List<Integer> partitionsBeingMoved = Lists.newArrayList(0);
Cluster targetCluster = RebalanceUtils.createUpdatedCluster(cluster,
cluster.getNodeById(1),
cluster.getNodeById(0),
partitionsBeingMoved);
HashMap<ByteArray, byte[]> entrySet = ServerTestUtils.createRandomKeyValuePairs(10);
SocketStoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(Lists.newArrayList("tcp://"
+ cluster.getNodeById(0)
.getHost()
+ ":"
+ cluster.getNodeById(0)
.getSocketPort())));
StoreClient<Object, Object> storeClient = factory.getStoreClient("test-recovery-data");
HashMap<ByteArray, byte[]> primarySet0 = Maps.newHashMap();
HashMap<ByteArray, byte[]> secondarySet0 = Maps.newHashMap();
RoutingStrategy strategy = new RoutingStrategyFactory().updateRoutingStrategy(RebalanceUtils.getStoreDefinitionWithName(storeDefs,
"test-recovery-data"),
cluster);
for(Entry<ByteArray, byte[]> entry: entrySet.entrySet()) {
storeClient.put(new String(entry.getKey().get()), new String(entry.getValue()));
List<Integer> pList = strategy.getPartitionList(entry.getKey().get());
System.out.print("ENTRY - " + ByteUtils.toHexString(entry.getKey().get()) + " - "
+ pList);
if(pList.get(0) == 0) {
primarySet0.put(entry.getKey(), entry.getValue());
System.out.print(" - primary");
} else if(pList.get(1) == 0) {
secondarySet0.put(entry.getKey(), entry.getValue());
System.out.print(" - secondary");
}
System.out.println();
}
RebalanceClusterPlan plan = new RebalanceClusterPlan(cluster,
targetCluster,
Lists.newArrayList(RebalanceUtils.getStoreDefinitionWithName(storeDefs,
"test-recovery-data")),
true);
List<RebalancePartitionsInfo> plans = RebalanceUtils.flattenNodePlans(Lists.newArrayList(plan.getRebalancingTaskQueue()));
try {
adminClient.rebalanceNode(plans.get(0));
fail("Should have thrown an exception since not in rebalancing state");
} catch(VoldemortException e) {
e.printStackTrace();
}
getServer(0).getMetadataStore().put(MetadataStore.SERVER_STATE_KEY,
MetadataStore.VoldemortState.REBALANCING_MASTER_SERVER);
getServer(1).getMetadataStore().put(MetadataStore.SERVER_STATE_KEY,
MetadataStore.VoldemortState.REBALANCING_MASTER_SERVER);
try {
adminClient.rebalanceNode(plans.get(0));
fail("Should have thrown an exception since no steal info");
} catch(VoldemortException e) {
e.printStackTrace();
}
// Set the rebalance info on the stealer node
getServer(plans.get(0).getStealerId()).getMetadataStore()
.put(MetadataStore.REBALANCING_STEAL_INFO,
new RebalancerState(Lists.newArrayList(plans.get(0))));
getServer(plans.get(1).getStealerId()).getMetadataStore()
.put(MetadataStore.REBALANCING_STEAL_INFO,
new RebalancerState(Lists.newArrayList(plans.get(1))));
// Update the cluster metadata on both the nodes
getServer(0).getMetadataStore().put(MetadataStore.CLUSTER_KEY, targetCluster);
getServer(1).getMetadataStore().put(MetadataStore.CLUSTER_KEY, targetCluster);
// Actually run it
try {
for(RebalancePartitionsInfo currentPlan: plans) {
int asyncId = adminClient.rebalanceNode(currentPlan);
assertNotSame("Got a valid rebalanceAsyncId", -1, asyncId);
getAdminClient().waitForCompletion(currentPlan.getStealerId(),
asyncId,
300,
TimeUnit.SECONDS);
}
} catch(Exception e) {
e.printStackTrace();
fail("Should not throw any exceptions");
}
Store<ByteArray, byte[], byte[]> store0 = getStore(0, "test-recovery-data");
Store<ByteArray, byte[], byte[]> store1 = getStore(1, "test-recovery-data");
// Primary set should have moved to node 1 and deleted from node 0
for(Entry<ByteArray, byte[]> entry: primarySet0.entrySet()) {
assertSame("entry should be present at store", 1, store1.get(entry.getKey(), null)
.size());
assertEquals("entry value should match",
new String(entry.getValue()),
new String(store1.get(entry.getKey(), null).get(0).getValue()));
assertEquals(store0.get(entry.getKey(), null).size(), 0);
}
// Secondary set should have moved to node 0 and deleted from node 1
for(Entry<ByteArray, byte[]> entry: secondarySet0.entrySet()) {
assertSame("entry should be present at store", 1, store0.get(entry.getKey(), null)
.size());
assertEquals("entry value should match",
new String(entry.getValue()),
new String(store0.get(entry.getKey(), null).get(0).getValue()));
assertEquals(store1.get(entry.getKey(), null).size(), 0);
}
}
@rsumbaly
Copy link
Author

rsumbaly commented May 4, 2011

This move can be totally ignored by optimizing based on the fact that we store partitions together in BDB...So basically if we're moving 0_1 or 0_2 to a location which already has 0_0, don't move it! Also this optimization should be done at a level higher than the cluster plan. Why? Because I said so :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment