Created
July 24, 2012 21:00
-
-
Save colinsurprenant/3172629 to your computer and use it in GitHub Desktop.
Storm performance topology
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package storm.starter; | |
import backtype.storm.Config; | |
import backtype.storm.StormSubmitter; | |
import backtype.storm.spout.SpoutOutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.BasicOutputCollector; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.TopologyBuilder; | |
import backtype.storm.topology.base.BaseBasicBolt; | |
import backtype.storm.topology.base.BaseRichSpout; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Tuple; | |
import backtype.storm.tuple.Values; | |
import java.util.Map; | |
import java.util.Random; | |
public class ThroughputTest { | |
public static class GenSpout extends BaseRichSpout { | |
private static final Character[] CHARS = new Character[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'}; | |
SpoutOutputCollector _collector; | |
int _size; | |
Random _rand; | |
String _id; | |
String _val; | |
public GenSpout(int size) { | |
_size = size; | |
} | |
@Override | |
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { | |
_collector = collector; | |
_rand = new Random(); | |
_id = randString(5); | |
_val = randString(_size); | |
} | |
@Override | |
public void nextTuple() { | |
_collector.emit(new Values(_id, _val)); | |
} | |
private String randString(int size) { | |
StringBuffer buf = new StringBuffer(); | |
for(int i=0; i<size; i++) { | |
buf.append(CHARS[_rand.nextInt(CHARS.length)]); | |
} | |
return buf.toString(); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("id", "item")); | |
} | |
} | |
public static class IdentityBolt extends BaseBasicBolt { | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("id", "item")); | |
} | |
@Override | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
collector.emit(tuple.getValues()); | |
} | |
} | |
public static class CountBolt extends BaseBasicBolt { | |
int _count; | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("count")); | |
} | |
@Override | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
_count+=1; | |
collector.emit(new Values(_count)); | |
} | |
} | |
public static class AckBolt extends BaseBasicBolt { | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
} | |
@Override | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
int size = Integer.parseInt(args[1]); | |
int workers = Integer.parseInt(args[2]); | |
int spout = Integer.parseInt(args[3]); | |
int bolt = Integer.parseInt(args[4]); | |
int maxPending = Integer.parseInt(args[5]); | |
TopologyBuilder builder = new TopologyBuilder(); | |
builder.setSpout("spout", new GenSpout(size), spout); | |
// builder.setBolt("count", new CountBolt(), bolt) | |
// .fieldsGrouping("bolt", new Fields("id")); | |
// builder.setBolt("bolt", new IdentityBolt(), bolt) | |
// .shuffleGrouping("spout"); | |
builder.setBolt("bolt2", new AckBolt(), bolt) | |
.shuffleGrouping("spout"); | |
// builder.setBolt("count2", new CountBolt(), bolt) | |
// .fieldsGrouping("bolt2", new Fields("id")); | |
Config conf = new Config(); | |
conf.setNumWorkers(workers); | |
//conf.setMaxSpoutPending(maxPending); | |
conf.setNumAckerExecutors(workers); | |
conf.setStatsSampleRate(0.0001); | |
//topology.executor.receive.buffer.size: 8192 #batched | |
//topology.executor.send.buffer.size: 8192 #individual messages | |
//topology.transfer.buffer.size: 1024 # batched | |
conf.put("topology.executor.send.buffer.size", 32768); | |
conf.put("topology.transfer.buffer.size", 32); | |
conf.put("topology.receiver.buffer.size", 8); | |
StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment