Created
May 11, 2011 00:41
-
-
Save rsumbaly/965695 to your computer and use it in GitHub Desktop.
FileStreamTest
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package voldemort.performance; | |
import java.io.BufferedWriter; | |
import java.io.File; | |
import java.io.FileOutputStream; | |
import java.io.FileWriter; | |
import java.io.IOException; | |
import java.util.Arrays; | |
import java.util.Collection; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import java.util.Set; | |
import java.util.Map.Entry; | |
import junit.framework.TestCase; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.junit.runners.Parameterized; | |
import org.junit.runners.Parameterized.Parameters; | |
import voldemort.ServerTestUtils; | |
import voldemort.TestUtils; | |
import voldemort.client.protocol.admin.AdminClient; | |
import voldemort.cluster.Cluster; | |
import voldemort.cluster.Node; | |
import voldemort.server.VoldemortServer; | |
import voldemort.store.Store; | |
import voldemort.store.StoreDefinition; | |
import voldemort.store.readonly.ReadOnlyStorageEngine; | |
import voldemort.store.readonly.ReadOnlyStorageFormat; | |
import voldemort.store.readonly.ReadOnlyStorageMetadata; | |
import voldemort.store.socket.SocketStoreFactory; | |
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; | |
import voldemort.utils.ByteArray; | |
import voldemort.utils.Pair; | |
import voldemort.utils.RebalanceUtils; | |
import voldemort.utils.Utils; | |
import voldemort.xml.StoreDefinitionsMapper; | |
import com.google.common.collect.Lists; | |
import com.google.common.collect.Maps; | |
/** | |
*/ | |
@RunWith(Parameterized.class) | |
public class FileStreamTest extends TestCase { | |
private static String storesXmlfile = "test/common/voldemort/config/stores.xml"; | |
private SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2, | |
10000, | |
100000, | |
32 * 1024); | |
private List<StoreDefinition> storeDefs; | |
private VoldemortServer[] servers; | |
private Cluster cluster; | |
private AdminClient adminClient; | |
private final boolean useNio; | |
public FileStreamTest(boolean useNio) { | |
this.useNio = useNio; | |
} | |
@Parameters | |
public static Collection<Object[]> configs() { | |
return Arrays.asList(new Object[][] { { true } }); | |
} | |
@Override | |
@Before | |
public void setUp() throws IOException { | |
cluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0 }, { 1 } }); | |
servers = new VoldemortServer[2]; | |
storeDefs = new StoreDefinitionsMapper().readStoreList(new File(storesXmlfile)); | |
servers[0] = ServerTestUtils.startVoldemortServer(socketStoreFactory, | |
ServerTestUtils.createServerConfig(useNio, | |
0, | |
TestUtils.createTempDir() | |
.getAbsolutePath(), | |
null, | |
storesXmlfile, | |
new Properties()), | |
cluster); | |
servers[1] = ServerTestUtils.startVoldemortServer(socketStoreFactory, | |
ServerTestUtils.createServerConfig(useNio, | |
1, | |
TestUtils.createTempDir() | |
.getAbsolutePath(), | |
null, | |
storesXmlfile, | |
new Properties()), | |
cluster); | |
adminClient = ServerTestUtils.getAdminClient(cluster); | |
} | |
@Override | |
@After | |
public void tearDown() throws IOException, InterruptedException { | |
adminClient.stop(); | |
for(VoldemortServer server: servers) { | |
ServerTestUtils.stopVoldemortServer(server); | |
} | |
socketStoreFactory.close(); | |
} | |
private VoldemortServer getVoldemortServer(int nodeId) { | |
return servers[nodeId]; | |
} | |
private AdminClient getAdminClient() { | |
return adminClient; | |
} | |
private Store<ByteArray, byte[], byte[]> getStore(int nodeID, String storeName) { | |
Store<ByteArray, byte[], byte[]> store = getVoldemortServer(nodeID).getStoreRepository() | |
.getStorageEngine(storeName); | |
assertNotSame("Store '" + storeName + "' should not be null", null, store); | |
return store; | |
} | |
@Test | |
public void testFetchPartitionFiles() throws IOException { | |
generateAndFetchFiles(1, 1, 1200, 12000000); | |
} | |
private void generateROFiles(int numChunks, | |
long indexSize, | |
long dataSize, | |
HashMap<Integer, List<Integer>> buckets, | |
File versionDir) throws IOException { | |
ReadOnlyStorageMetadata metadata = new ReadOnlyStorageMetadata(); | |
metadata.add(ReadOnlyStorageMetadata.FORMAT, ReadOnlyStorageFormat.READONLY_V2.getCode()); | |
File metadataFile = new File(versionDir, ".metadata"); | |
BufferedWriter writer = new BufferedWriter(new FileWriter(metadataFile)); | |
writer.write(metadata.toJsonString()); | |
writer.close(); | |
for(Entry<Integer, List<Integer>> entry: buckets.entrySet()) { | |
int replicaType = entry.getKey(); | |
for(int partitionId: entry.getValue()) { | |
for(int chunkId = 0; chunkId < numChunks; chunkId++) { | |
System.out.println("Writing replica " + replicaType + " - partition " | |
+ partitionId + " - chunk " + chunkId); | |
File index = new File(versionDir, Integer.toString(partitionId) + "_" | |
+ Integer.toString(replicaType) + "_" | |
+ Integer.toString(chunkId) + ".index"); | |
File data = new File(versionDir, Integer.toString(partitionId) + "_" | |
+ Integer.toString(replicaType) + "_" | |
+ Integer.toString(chunkId) + ".data"); | |
// write some random crap for index and data | |
FileOutputStream dataOs = new FileOutputStream(data); | |
for(int i = 0; i < dataSize; i++) | |
dataOs.write(i); | |
dataOs.close(); | |
FileOutputStream indexOs = new FileOutputStream(index); | |
for(int i = 0; i < indexSize; i++) | |
indexOs.write(i); | |
indexOs.close(); | |
} | |
} | |
} | |
} | |
@SuppressWarnings("unchecked") | |
private void generateAndFetchFiles(int numChunks, long versionId, long indexSize, long dataSize) | |
throws IOException { | |
Map<Integer, Set<Pair<Integer, Integer>>> buckets = RebalanceUtils.getNodeIdToAllPartitions(cluster, | |
Lists.newArrayList(RebalanceUtils.getStoreDefinitionWithName(storeDefs, | |
"test-readonly-fetchfiles")), | |
true); | |
for(Node node: cluster.getNodes()) { | |
ReadOnlyStorageEngine store = (ReadOnlyStorageEngine) getStore(node.getId(), | |
"test-readonly-fetchfiles"); | |
// Create list of buckets ( replica to partition ) | |
Set<Pair<Integer, Integer>> nodeBucketsSet = buckets.get(node.getId()); | |
HashMap<Integer, List<Integer>> nodeBuckets = RebalanceUtils.flattenPartitionTuples(nodeBucketsSet); | |
// Split the buckets into primary and replica buckets | |
HashMap<Integer, List<Integer>> primaryNodeBuckets = Maps.newHashMap(); | |
primaryNodeBuckets.put(0, nodeBuckets.get(0)); | |
int primaryPartitions = nodeBuckets.get(0).size(); | |
HashMap<Integer, List<Integer>> replicaNodeBuckets = Maps.newHashMap(nodeBuckets); | |
replicaNodeBuckets.remove(0); | |
int replicaPartitions = 0; | |
for(List<Integer> partitions: replicaNodeBuckets.values()) { | |
replicaPartitions += partitions.size(); | |
} | |
// Generate data... | |
File newVersionDir = new File(store.getStoreDirPath(), "version-" | |
+ Long.toString(versionId)); | |
Utils.mkdirs(newVersionDir); | |
generateROFiles(numChunks, indexSize, dataSize, nodeBuckets, newVersionDir); | |
// Swap it... | |
store.swapFiles(newVersionDir.getAbsolutePath()); | |
// Check if everything got mmap-ed correctly... | |
HashMap<Object, Integer> chunkIdToNumChunks = store.getChunkedFileSet() | |
.getChunkIdToNumChunks(); | |
for(Object bucket: chunkIdToNumChunks.keySet()) { | |
Pair<Integer, Integer> partitionToReplicaBucket = (Pair<Integer, Integer>) bucket; | |
Pair<Integer, Integer> replicaToPartitionBucket = Pair.create(partitionToReplicaBucket.getSecond(), | |
partitionToReplicaBucket.getFirst()); | |
assertTrue(nodeBucketsSet.contains(replicaToPartitionBucket)); | |
} | |
// Test 3) Fetch all the partitions... | |
File tempDir = TestUtils.createTempDir(); | |
getAdminClient().fetchPartitionFiles(node.getId(), | |
"test-readonly-fetchfiles", | |
nodeBuckets, | |
tempDir.getAbsolutePath(), | |
null); | |
// Check it... | |
assertEquals(tempDir.list().length, 2 * (primaryPartitions + replicaPartitions) | |
* numChunks); | |
for(Entry<Integer, List<Integer>> entry: nodeBuckets.entrySet()) { | |
int replicaType = entry.getKey(); | |
for(int partitionId: entry.getValue()) { | |
for(int chunkId = 0; chunkId < numChunks; chunkId++) { | |
File indexFile = new File(tempDir, Integer.toString(partitionId) + "_" | |
+ Integer.toString(replicaType) + "_" | |
+ Integer.toString(chunkId) + ".index"); | |
File dataFile = new File(tempDir, Integer.toString(partitionId) + "_" | |
+ Integer.toString(replicaType) + "_" | |
+ Integer.toString(chunkId) + ".data"); | |
assertTrue(indexFile.exists()); | |
assertTrue(dataFile.exists()); | |
assertEquals(indexFile.length(), indexSize); | |
assertEquals(dataFile.length(), dataSize); | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment