Created
August 2, 2010 22:54
-
-
Save rsumbaly/505476 to your computer and use it in GitHub Desktop.
Hadoop RW Store EndToEnd
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package voldemort.store.readwrite.mr; | |
import java.io.File; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import junit.framework.TestCase; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapred.TextInputFormat; | |
import org.junit.After; | |
import org.junit.Before; | |
import voldemort.ServerTestUtils; | |
import voldemort.TestUtils; | |
import voldemort.client.ClientConfig; | |
import voldemort.client.RoutingTier; | |
import voldemort.client.SocketStoreClientFactory; | |
import voldemort.client.StoreClient; | |
import voldemort.client.protocol.admin.AdminClient; | |
import voldemort.client.protocol.admin.AdminClientConfig; | |
import voldemort.cluster.Cluster; | |
import voldemort.routing.RoutingStrategyType; | |
import voldemort.serialization.SerializerDefinition; | |
import voldemort.server.VoldemortServer; | |
import voldemort.store.StoreDefinition; | |
import voldemort.store.StoreDefinitionBuilder; | |
import voldemort.store.bdb.BdbStorageConfiguration; | |
import voldemort.store.socket.SocketStoreFactory; | |
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; | |
import voldemort.versioning.InconsistencyResolver; | |
import voldemort.versioning.VectorClock; | |
import voldemort.versioning.Versioned; | |
import voldemort.xml.StoreDefinitionsMapper; | |
import com.google.common.collect.Lists; | |
public class HadoopRWStoreBuilderTest extends TestCase { | |
private SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2, | |
10000, | |
100000, | |
32 * 1024); | |
private static int numEntries = 20; | |
private String storeName = "test"; | |
private Cluster cluster; | |
private StoreDefinition storeDef; | |
private VoldemortServer server; | |
private AdminClient adminClient; | |
private SocketStoreClientFactory socketFactory; | |
private StoreClient<Object, String> storeClient; | |
public static class TextStoreMapper extends | |
AbstractRWHadoopStoreBuilderMapper<LongWritable, Text> { | |
@Override | |
public Object makeKey(LongWritable key, Text value) { | |
String[] tokens = value.toString().split("\\s+"); | |
return tokens[0]; | |
} | |
@Override | |
public Object makeValue(LongWritable key, Text value) { | |
String[] tokens = value.toString().split("\\s+"); | |
return tokens[1]; | |
} | |
} | |
public static class MyResolver implements InconsistencyResolver<Versioned<String>> { | |
public List<Versioned<String>> resolveConflicts(List<Versioned<String>> items) { | |
if(items.size() <= 1) { | |
return items; | |
} else { | |
Iterator<Versioned<String>> iter = items.iterator(); | |
Versioned<String> current = iter.next(); | |
String returnValue = current.getValue(); | |
VectorClock clock = (VectorClock) current.getVersion(); | |
long maxTime = ((VectorClock) current.getVersion()).getTimestamp(); | |
while(iter.hasNext()) { | |
Versioned<String> versioned = iter.next(); | |
VectorClock newClock = (VectorClock) versioned.getVersion(); | |
if(newClock.getTimestamp() > maxTime) { | |
returnValue = versioned.getValue(); | |
maxTime = clock.getTimestamp(); | |
} | |
clock = clock.merge((VectorClock) versioned.getVersion()); | |
} | |
return Collections.singletonList(new Versioned<String>(returnValue, clock)); | |
} | |
} | |
} | |
@Override | |
@Before | |
public void setUp() throws Exception { | |
cluster = ServerTestUtils.getLocalCluster(1); | |
SerializerDefinition serDef = new SerializerDefinition("string"); | |
storeDef = new StoreDefinitionBuilder().setName(storeName) | |
.setType(BdbStorageConfiguration.TYPE_NAME) | |
.setKeySerializer(serDef) | |
.setValueSerializer(serDef) | |
.setRoutingPolicy(RoutingTier.CLIENT) | |
.setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY) | |
.setReplicationFactor(1) | |
.setRequiredReads(1) | |
.setRequiredWrites(1) | |
.build(); | |
// Store.xml file | |
File tempConfDir = TestUtils.createTempDir(); | |
File storeXml = new File(tempConfDir, "stores.xml"); | |
FileUtils.writeStringToFile(storeXml, | |
new StoreDefinitionsMapper().writeStoreList(Lists.newArrayList(storeDef))); | |
server = ServerTestUtils.startVoldemortServer(socketStoreFactory, | |
ServerTestUtils.createServerConfig(true, | |
0, | |
tempConfDir.getAbsolutePath(), | |
null, | |
storeXml.getAbsolutePath(), | |
new Properties()), | |
cluster); | |
adminClient = new AdminClient(cluster, new AdminClientConfig().setMaxThreads(1)); | |
ClientConfig clientConfig = new ClientConfig().setMaxThreads(1) | |
.setMaxTotalConnections(1) | |
.setMaxConnectionsPerNode(1) | |
.setBootstrapUrls("tcp://localhost:" | |
+ cluster.getNodes() | |
.iterator() | |
.next() | |
.getSocketPort()); | |
this.socketFactory = new SocketStoreClientFactory(clientConfig); | |
this.storeClient = socketFactory.getStoreClient(storeName, new MyResolver()); | |
} | |
@Override | |
@After | |
public void tearDown() throws Exception { | |
socketFactory.close(); | |
adminClient.stop(); | |
server.stop(); | |
socketStoreFactory.close(); | |
} | |
public void testHadoopBuild() throws Exception { | |
Map<String, String> values = new HashMap<String, String>(); | |
File inputDir = TestUtils.createTempDir(); | |
File tempDir = TestUtils.createTempDir(); | |
for(int i = 0; i < numEntries; i++) | |
values.put(Integer.toString(i), Integer.toBinaryString(i)); | |
// Put in a value for key 0 | |
storeClient.put(Integer.toString(0), "blah"); | |
// write test data to text file | |
File inputFile = File.createTempFile("input", ".txt", inputDir); | |
inputFile.deleteOnExit(); | |
StringBuilder contents = new StringBuilder(); | |
for(Map.Entry<String, String> entry: values.entrySet()) | |
contents.append(entry.getKey() + "\t" + entry.getValue() + "\n"); | |
FileUtils.writeStringToFile(inputFile, contents.toString()); | |
int hadoopNodeId = 123; | |
int hadoopPushVersion = 456; | |
HadoopRWStoreBuilder builder = new HadoopRWStoreBuilder(new Configuration(), | |
TextStoreMapper.class, | |
TextInputFormat.class, | |
cluster, | |
storeDef, | |
60 * 1000, | |
hadoopNodeId, | |
hadoopPushVersion, | |
new Path(tempDir.getAbsolutePath()), | |
new Path(inputDir.getAbsolutePath())); | |
builder.build(); | |
System.out.println("GET BACK THE KEY 0 = " + storeClient.get(Integer.toString(0))); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment