|
package com.mkscrg.sandbox; |
|
|
|
import backtype.storm.Config; |
|
import backtype.storm.LocalCluster; |
|
import backtype.storm.StormSubmitter; |
|
import backtype.storm.generated.AlreadyAliveException; |
|
import backtype.storm.generated.InvalidTopologyException; |
|
import backtype.storm.generated.StormTopology; |
|
import backtype.storm.spout.SpoutOutputCollector; |
|
import backtype.storm.task.TopologyContext; |
|
import backtype.storm.topology.BasicOutputCollector; |
|
import backtype.storm.topology.OutputFieldsDeclarer; |
|
import backtype.storm.topology.TopologyBuilder; |
|
import backtype.storm.topology.base.BaseBasicBolt; |
|
import backtype.storm.topology.base.BaseRichSpout; |
|
import backtype.storm.tuple.Fields; |
|
import backtype.storm.tuple.Tuple; |
|
import backtype.storm.tuple.Values; |
|
import backtype.storm.utils.Utils; |
|
import org.apache.log4j.Logger; |
|
|
|
import java.util.Map; |
|
import java.util.UUID; |
|
|
|
public final class StormMSPTest { |
|
private static final String ONE_FIELD = "one"; |
|
|
|
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { |
|
boolean isRemote = false; |
|
String topologyName = "msp-test_local"; |
|
if (args != null && args.length > 0) { |
|
isRemote = true; |
|
topologyName = args[0]; |
|
} |
|
|
|
TopologyBuilder topologyBuilder = new TopologyBuilder(); |
|
topologyBuilder.setSpout("one-spout", new OneSpout(), 10); |
|
topologyBuilder.setBolt("split-bolt", new SplitBolt(), 1) |
|
.shuffleGrouping("one-spout"); |
|
topologyBuilder.setBolt("count-bolt", new CountBolt(), 1) |
|
.shuffleGrouping("split-bolt", SplitBolt.STREAM1) |
|
.shuffleGrouping("split-bolt", SplitBolt.STREAM2); |
|
|
|
StormTopology stormTopology = topologyBuilder.createTopology(); |
|
|
|
Config config = new Config(); |
|
config.setMaxSpoutPending(10); |
|
config.setNumWorkers(2); |
|
|
|
if (isRemote) { |
|
StormSubmitter.submitTopology(topologyName, config, stormTopology); |
|
} else { |
|
LocalCluster localCluster = new LocalCluster(); |
|
localCluster.submitTopology(topologyName, config, stormTopology); |
|
|
|
Thread.sleep(20000); |
|
|
|
localCluster.shutdown(); |
|
} |
|
} |
|
|
|
public static final class OneSpout extends BaseRichSpout { |
|
private static final Logger LOGGER = Logger.getLogger(OneSpout.class); |
|
|
|
private SpoutOutputCollector spoutOutputCollector; |
|
|
|
@Override |
|
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { |
|
outputFieldsDeclarer.declare(new Fields(ONE_FIELD)); |
|
} |
|
|
|
@Override |
|
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { |
|
this.spoutOutputCollector = spoutOutputCollector; |
|
} |
|
|
|
@Override |
|
public void nextTuple() { |
|
spoutOutputCollector.emit(new Values(1), UUID.randomUUID().toString()); |
|
LOGGER.info("Emitted 1"); |
|
} |
|
|
|
@Override |
|
public void fail(Object msgId) { |
|
nextTuple(); |
|
} |
|
} |
|
|
|
public static final class SplitBolt extends BaseBasicBolt { |
|
private static final Logger LOGGER = Logger.getLogger(SplitBolt.class); |
|
public static final String STREAM1 = "stream1"; |
|
public static final String STREAM2 = "stream2"; |
|
|
|
private boolean streamFlop = false; |
|
|
|
@Override |
|
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { |
|
Integer i = tuple.getIntegerByField(ONE_FIELD); |
|
LOGGER.info("Received " + i); |
|
|
|
String streamId; |
|
if (streamFlop) { |
|
streamId = STREAM1; |
|
streamFlop = false; |
|
} else { |
|
streamId = STREAM2; |
|
streamFlop = true; |
|
} |
|
|
|
basicOutputCollector.emit(streamId, new Values(i)); |
|
LOGGER.info("Emitted " + i + " on " + streamId); |
|
} |
|
|
|
@Override |
|
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { |
|
outputFieldsDeclarer.declareStream(STREAM1, new Fields(ONE_FIELD)); |
|
outputFieldsDeclarer.declareStream(STREAM2, new Fields(ONE_FIELD)); |
|
} |
|
} |
|
|
|
public static final class CountBolt extends BaseBasicBolt { |
|
private static final Logger LOGGER = Logger.getLogger(CountBolt.class); |
|
|
|
private int counter = 0; |
|
|
|
@Override |
|
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { |
|
Integer i = tuple.getIntegerByField(ONE_FIELD); |
|
LOGGER.info("Received " + i + "(" + ++counter + ")"); |
|
|
|
Utils.sleep(100); |
|
} |
|
|
|
@Override |
|
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { |
|
// No output. |
|
} |
|
} |
|
} |