Created
June 28, 2011 20:46
-
-
Save rsumbaly/1052163 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package voldemort.utils; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Set; | |
import joptsimple.OptionParser; | |
import joptsimple.OptionSet; | |
import voldemort.client.protocol.RequestFormatType; | |
import voldemort.client.protocol.admin.AdminClient; | |
import voldemort.client.protocol.admin.AdminClientConfig; | |
import voldemort.cluster.Cluster; | |
import voldemort.cluster.Node; | |
import voldemort.routing.RoutingStrategy; | |
import voldemort.routing.RoutingStrategyFactory; | |
import voldemort.serialization.DefaultSerializerFactory; | |
import voldemort.serialization.Serializer; | |
import voldemort.server.RequestRoutingType; | |
import voldemort.store.Store; | |
import voldemort.store.StoreDefinition; | |
import voldemort.store.socket.SocketStoreFactory; | |
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool; | |
import voldemort.versioning.Versioned; | |
import voldemort.xml.ClusterMapper; | |
import voldemort.xml.StoreDefinitionsMapper; | |
import com.google.common.base.Joiner; | |
import com.google.common.collect.Maps; | |
public class Entropy { | |
private int nodeId; | |
private long numKeys; | |
public static long DEFAULT_NUM_KEYS = 10000; | |
/** | |
* Entropy constructor. Uses DEFAULT_NUM_KEYS number of keys | |
* | |
* @param nodeId Node id. If -1, goes over all of them | |
* @param numThreads Number of threads | |
*/ | |
public Entropy(int nodeId) { | |
this.nodeId = nodeId; | |
this.numKeys = DEFAULT_NUM_KEYS; | |
} | |
/** | |
* Entropy constructor | |
* | |
* @param nodeId Node id. If -1, goes over all of them | |
* @param numKeys Number of keys | |
*/ | |
public Entropy(int nodeId, long numKeys) { | |
this.nodeId = nodeId; | |
this.numKeys = numKeys; | |
} | |
public static void main(String args[]) throws IOException { | |
OptionParser parser = new OptionParser(); | |
parser.accepts("help", "print help information"); | |
parser.accepts("node", "Node id") | |
.withRequiredArg() | |
.describedAs("node-id") | |
.ofType(Integer.class); | |
parser.accepts("cluster-xml", "[REQUIRED] Path to cluster-xml") | |
.withRequiredArg() | |
.describedAs("xml") | |
.ofType(String.class); | |
parser.accepts("stores-xml", "[REQUIRED] Path to stores-xml") | |
.withRequiredArg() | |
.describedAs("xml") | |
.ofType(String.class); | |
parser.accepts("output-dir", | |
"[REQUIRED] The output directory where we'll store / retrieve the keys. ") | |
.withRequiredArg() | |
.describedAs("output-dir") | |
.ofType(String.class); | |
parser.accepts("op-type", | |
"Operation type - false ( save keys ) [ default ], true ( run entropy calculator ) ") | |
.withRequiredArg() | |
.ofType(Boolean.class); | |
parser.accepts("num-keys", | |
"Number of keys per store [ Default: " + Entropy.DEFAULT_NUM_KEYS + " ]") | |
.withRequiredArg() | |
.describedAs("keys") | |
.ofType(Long.class); | |
OptionSet options = parser.parse(args); | |
if(options.has("help")) { | |
parser.printHelpOn(System.out); | |
System.exit(0); | |
} | |
Set<String> missing = CmdUtils.missing(options, "cluster-xml", "stores-xml", "output-dir"); | |
if(missing.size() > 0) { | |
System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing)); | |
parser.printHelpOn(System.err); | |
System.exit(1); | |
} | |
// compulsory params | |
String clusterXml = (String) options.valueOf("cluster-xml"); | |
String storesXml = (String) options.valueOf("stores-xml"); | |
String outputDirPath = (String) options.valueOf("output-dir"); | |
long numKeys = CmdUtils.valueOf(options, "num-keys", Entropy.DEFAULT_NUM_KEYS); | |
int nodeId = CmdUtils.valueOf(options, "node", 0); | |
boolean opType = CmdUtils.valueOf(options, "op-type", false); | |
File outputDir = new File(outputDirPath); | |
if(!outputDir.exists()) { | |
outputDir.mkdirs(); | |
} else if(!(outputDir.isDirectory() && outputDir.canWrite())) { | |
System.err.println("Cannot write to output directory " + outputDirPath); | |
parser.printHelpOn(System.err); | |
System.exit(1); | |
} | |
if(!Utils.isReadableFile(clusterXml) || !Utils.isReadableFile(storesXml)) { | |
System.err.println("Cannot read metadata file "); | |
System.exit(1); | |
} | |
// Parse the metadata | |
Cluster cluster = new ClusterMapper().readCluster(new File(clusterXml)); | |
List<StoreDefinition> storeDefs = new StoreDefinitionsMapper().readStoreList(new File(storesXml)); | |
Entropy detector = new Entropy(nodeId, numKeys); | |
detector.generateEntropy(cluster, storeDefs, outputDir, opType); | |
} | |
/** | |
* Run the actual entropy calculation tool | |
* | |
* @param cluster The cluster metadata | |
* @param storeDefs The list of stores | |
* @param storeDir The store directory | |
* @param opType Operation type - true ( run entropy calculator ), false ( | |
* save keys ) | |
* @throws IOException | |
*/ | |
public void generateEntropy(Cluster cluster, | |
List<StoreDefinition> storeDefs, | |
File storeDir, | |
boolean opType) throws IOException { | |
AdminClient adminClient = null; | |
try { | |
adminClient = new AdminClient(cluster, | |
new AdminClientConfig().setMaxConnectionsPerNode(storeDefs.size())); | |
if(opType) { | |
System.out.println("Running entropy calculator"); | |
} else { | |
System.out.println("Generating keys for future entropy calculation"); | |
Utils.mkdirs(storeDir); | |
} | |
for(StoreDefinition storeDef: storeDefs) { | |
File storesKeyFile = new File(storeDir, storeDef.getName()); | |
if(AdminClient.restoreStoreEngineBlackList.contains(storeDef.getType())) { | |
System.out.println("Ignoring store " + storeDef.getName()); | |
continue; | |
} else { | |
System.out.println("Working on store " + storeDef.getName()); | |
} | |
RoutingStrategy strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef, | |
cluster); | |
SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2, | |
10000, | |
100000, | |
32 * 1024); | |
// Cache connections to all nodes for this store in advance | |
HashMap<Integer, Store<ByteArray, byte[], byte[]>> socketStoresPerNode = Maps.newHashMap(); | |
for(Node node: cluster.getNodes()) { | |
socketStoresPerNode.put(node.getId(), | |
socketStoreFactory.create(storeDef.getName(), | |
node.getHost(), | |
node.getSocketPort(), | |
RequestFormatType.PROTOCOL_BUFFERS, | |
RequestRoutingType.IGNORE_CHECKS)); | |
} | |
if(!opType) { | |
if(storesKeyFile.exists()) { | |
System.err.println("Key files for " + storeDef.getName() | |
+ " already exists"); | |
continue; | |
} | |
FileOutputStream writer = null; | |
try { | |
writer = new FileOutputStream(storesKeyFile); | |
Iterator<Pair<ByteArray, Versioned<byte[]>>> entriesIterator = null; | |
if(nodeId == -1) { | |
int numKeysPerNode = (int) Math.floor(numKeys | |
/ cluster.getNumberOfNodes()); | |
for(Node node: cluster.getNodes()) { | |
entriesIterator = adminClient.fetchEntries(node.getId(), | |
storeDef.getName(), | |
cluster.getNodeById(node.getId()) | |
.getPartitionIds(), | |
null, | |
true); | |
for(long keyId = 0; keyId < numKeysPerNode | |
&& entriesIterator.hasNext(); keyId++) { | |
Pair<ByteArray, Versioned<byte[]>> pair = entriesIterator.next(); | |
ByteArray key = pair.getFirst(); | |
Versioned<byte[]> value = pair.getSecond(); | |
if(RebalanceUtils.getNodeIds(strategy.routeRequest(key.get()) | |
.subList(0, 1)) | |
.contains(node.getId()) | |
&& null != value && value.getValue().length > 0) { | |
List<Node> responsibleNodes = strategy.routeRequest(key.get()); | |
boolean missingKey = false; | |
for(Node responsibleNode: responsibleNodes) { | |
List<Versioned<byte[]>> newValue = socketStoresPerNode.get(responsibleNode.getId()) | |
.get(key, | |
null); | |
if(newValue == null || newValue.size() == 0) { | |
missingKey = true; | |
} | |
} | |
if(!missingKey) { | |
writer.write(key.length()); | |
writer.write(key.get()); | |
} | |
} | |
} | |
} | |
} else { | |
entriesIterator = adminClient.fetchEntries(nodeId, | |
storeDef.getName(), | |
cluster.getNodeById(nodeId) | |
.getPartitionIds(), | |
null, | |
true); | |
for(long keyId = 0; keyId < numKeys && entriesIterator.hasNext(); keyId++) { | |
Pair<ByteArray, Versioned<byte[]>> pair = entriesIterator.next(); | |
ByteArray key = pair.getFirst(); | |
Versioned<byte[]> value = pair.getSecond(); | |
if(RebalanceUtils.getNodeIds(strategy.routeRequest(key.get()) | |
.subList(0, 1)) | |
.contains(nodeId) | |
&& null != value && value.getValue().length > 0) { | |
List<Node> responsibleNodes = strategy.routeRequest(key.get()); | |
boolean missingKey = false; | |
for(Node responsibleNode: responsibleNodes) { | |
List<Versioned<byte[]>> newValue = socketStoresPerNode.get(responsibleNode.getId()) | |
.get(key, | |
null); | |
if(newValue == null || newValue.size() == 0) { | |
missingKey = true; | |
} | |
} | |
if(!missingKey) { | |
writer.write(key.length()); | |
writer.write(key.get()); | |
} | |
} | |
} | |
} | |
} finally { | |
if(writer != null) | |
writer.close(); | |
} | |
} else { | |
Serializer<Object> keySerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(storeDef.getKeySerializer()); | |
if(!(storesKeyFile.exists() && storesKeyFile.canRead())) { | |
System.err.println("Could not find " + storeDef.getName() | |
+ " file to check"); | |
continue; | |
} | |
FileInputStream reader = null; | |
long foundKeys = 0L; | |
long totalKeys = 0L; | |
long nullValueCount = 0L; | |
long zeroValueCount = 0L; | |
try { | |
reader = new FileInputStream(storesKeyFile); | |
while(reader.available() != 0) { | |
int size = reader.read(); | |
if(size <= 0) { | |
break; | |
} | |
// Read the key | |
byte[] key = new byte[size]; | |
reader.read(key); | |
List<Node> responsibleNodes = strategy.routeRequest(key); | |
boolean missingKey = false; | |
for(Node node: responsibleNodes) { | |
List<Versioned<byte[]>> value = socketStoresPerNode.get(node.getId()) | |
.get(new ByteArray(key), | |
null); | |
if(value == null) { | |
missingKey = true; | |
nullValueCount++; | |
} else if(value.size() == 0) { | |
missingKey = true; | |
zeroValueCount++; | |
System.out.println("Zone value length key: " | |
+ keySerializer.toObject(key) + " on node " | |
+ node.getId()); | |
} | |
} | |
if(!missingKey) | |
foundKeys++; | |
totalKeys++; | |
} | |
System.out.println("Found = " + foundKeys + " Total = " + totalKeys | |
+ " ZeroLengthKeys = " + zeroValueCount | |
+ " NullValueKeys = " + nullValueCount); | |
if(foundKeys > 0 && totalKeys > 0) { | |
System.out.println("%age found - " + 100.0 * (double) foundKeys | |
/ totalKeys); | |
} | |
} finally { | |
if(reader != null) | |
reader.close(); | |
// close all socket stores | |
for(Store<ByteArray, byte[], byte[]> store: socketStoresPerNode.values()) { | |
store.close(); | |
} | |
} | |
} | |
} | |
} finally { | |
if(adminClient != null) | |
adminClient.stop(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment