Created
May 4, 2011 17:11
-
-
Save rsumbaly/955590 to your computer and use it in GitHub Desktop.
Optimization for BDB!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void testRebalanceNodeRWStore() { | |
List<Integer> partitionsBeingMoved = Lists.newArrayList(0); | |
Cluster targetCluster = RebalanceUtils.createUpdatedCluster(cluster, | |
cluster.getNodeById(1), | |
cluster.getNodeById(0), | |
partitionsBeingMoved); | |
HashMap<ByteArray, byte[]> entrySet = ServerTestUtils.createRandomKeyValuePairs(10); | |
SocketStoreClientFactory factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(Lists.newArrayList("tcp://" | |
+ cluster.getNodeById(0) | |
.getHost() | |
+ ":" | |
+ cluster.getNodeById(0) | |
.getSocketPort()))); | |
StoreClient<Object, Object> storeClient = factory.getStoreClient("test-recovery-data"); | |
HashMap<ByteArray, byte[]> primarySet0 = Maps.newHashMap(); | |
HashMap<ByteArray, byte[]> secondarySet0 = Maps.newHashMap(); | |
RoutingStrategy strategy = new RoutingStrategyFactory().updateRoutingStrategy(RebalanceUtils.getStoreDefinitionWithName(storeDefs, | |
"test-recovery-data"), | |
cluster); | |
for(Entry<ByteArray, byte[]> entry: entrySet.entrySet()) { | |
storeClient.put(new String(entry.getKey().get()), new String(entry.getValue())); | |
List<Integer> pList = strategy.getPartitionList(entry.getKey().get()); | |
System.out.print("ENTRY - " + ByteUtils.toHexString(entry.getKey().get()) + " - " | |
+ pList); | |
if(pList.get(0) == 0) { | |
primarySet0.put(entry.getKey(), entry.getValue()); | |
System.out.print(" - primary"); | |
} else if(pList.get(1) == 0) { | |
secondarySet0.put(entry.getKey(), entry.getValue()); | |
System.out.print(" - secondary"); | |
} | |
System.out.println(); | |
} | |
RebalanceClusterPlan plan = new RebalanceClusterPlan(cluster, | |
targetCluster, | |
Lists.newArrayList(RebalanceUtils.getStoreDefinitionWithName(storeDefs, | |
"test-recovery-data")), | |
true); | |
List<RebalancePartitionsInfo> plans = RebalanceUtils.flattenNodePlans(Lists.newArrayList(plan.getRebalancingTaskQueue())); | |
try { | |
adminClient.rebalanceNode(plans.get(0)); | |
fail("Should have thrown an exception since not in rebalancing state"); | |
} catch(VoldemortException e) { | |
e.printStackTrace(); | |
} | |
getServer(0).getMetadataStore().put(MetadataStore.SERVER_STATE_KEY, | |
MetadataStore.VoldemortState.REBALANCING_MASTER_SERVER); | |
getServer(1).getMetadataStore().put(MetadataStore.SERVER_STATE_KEY, | |
MetadataStore.VoldemortState.REBALANCING_MASTER_SERVER); | |
try { | |
adminClient.rebalanceNode(plans.get(0)); | |
fail("Should have thrown an exception since no steal info"); | |
} catch(VoldemortException e) { | |
e.printStackTrace(); | |
} | |
// Set the rebalance info on the stealer node | |
getServer(plans.get(0).getStealerId()).getMetadataStore() | |
.put(MetadataStore.REBALANCING_STEAL_INFO, | |
new RebalancerState(Lists.newArrayList(plans.get(0)))); | |
getServer(plans.get(1).getStealerId()).getMetadataStore() | |
.put(MetadataStore.REBALANCING_STEAL_INFO, | |
new RebalancerState(Lists.newArrayList(plans.get(1)))); | |
// Update the cluster metadata on both the nodes | |
getServer(0).getMetadataStore().put(MetadataStore.CLUSTER_KEY, targetCluster); | |
getServer(1).getMetadataStore().put(MetadataStore.CLUSTER_KEY, targetCluster); | |
// Actually run it | |
try { | |
for(RebalancePartitionsInfo currentPlan: plans) { | |
int asyncId = adminClient.rebalanceNode(currentPlan); | |
assertNotSame("Got a valid rebalanceAsyncId", -1, asyncId); | |
getAdminClient().waitForCompletion(currentPlan.getStealerId(), | |
asyncId, | |
300, | |
TimeUnit.SECONDS); | |
} | |
} catch(Exception e) { | |
e.printStackTrace(); | |
fail("Should not throw any exceptions"); | |
} | |
Store<ByteArray, byte[], byte[]> store0 = getStore(0, "test-recovery-data"); | |
Store<ByteArray, byte[], byte[]> store1 = getStore(1, "test-recovery-data"); | |
// Primary set should have moved to node 1 and deleted from node 0 | |
for(Entry<ByteArray, byte[]> entry: primarySet0.entrySet()) { | |
assertSame("entry should be present at store", 1, store1.get(entry.getKey(), null) | |
.size()); | |
assertEquals("entry value should match", | |
new String(entry.getValue()), | |
new String(store1.get(entry.getKey(), null).get(0).getValue())); | |
assertEquals(store0.get(entry.getKey(), null).size(), 0); | |
} | |
// Secondary set should have moved to node 0 and deleted from node 1 | |
for(Entry<ByteArray, byte[]> entry: secondarySet0.entrySet()) { | |
assertSame("entry should be present at store", 1, store0.get(entry.getKey(), null) | |
.size()); | |
assertEquals("entry value should match", | |
new String(entry.getValue()), | |
new String(store0.get(entry.getKey(), null).get(0).getValue())); | |
assertEquals(store1.get(entry.getKey(), null).size(), 0); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This move can be totally ignored by optimizing based on the fact that we store partitions together in BDB...So basically if we're moving 0_1 or 0_2 to a location which already has 0_0, don't move it! Also this optimization should be done at a level higher than the cluster plan. Why? Because I said so :D