Created
April 29, 2016 07:09
-
-
Save HeartSaVioR/9ebac9f7f8503e878b0009a37c2b686b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.apache.storm.starter; | |
| import org.apache.storm.Config; | |
| import org.apache.storm.LocalCluster; | |
| import org.apache.storm.StormSubmitter; | |
| import org.apache.storm.spout.SpoutOutputCollector; | |
| import org.apache.storm.task.OutputCollector; | |
| import org.apache.storm.task.TopologyContext; | |
| import org.apache.storm.testing.TestWordSpout; | |
| import org.apache.storm.topology.OutputFieldsDeclarer; | |
| import org.apache.storm.topology.TopologyBuilder; | |
| import org.apache.storm.topology.base.BaseRichBolt; | |
| import org.apache.storm.topology.base.BaseRichSpout; | |
| import org.apache.storm.tuple.Fields; | |
| import org.apache.storm.tuple.Tuple; | |
| import org.apache.storm.tuple.Values; | |
| import org.apache.storm.utils.Utils; | |
| import org.slf4j.LoggerFactory; | |
| import java.util.HashMap; | |
| import java.util.Map; | |
| import java.util.Random; | |
| import java.util.UUID; | |
| class DelayedWordSpout extends BaseRichSpout { | |
| public static org.slf4j.Logger LOG = LoggerFactory.getLogger(DelayedWordSpout.class); | |
| boolean _isDistributed; | |
| SpoutOutputCollector _collector; | |
| public DelayedWordSpout() { | |
| this(true); | |
| } | |
| public DelayedWordSpout(boolean isDistributed) { | |
| _isDistributed = isDistributed; | |
| } | |
| public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { | |
| _collector = collector; | |
| } | |
| public void close() { | |
| } | |
| public void nextTuple() { | |
| final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; | |
| final Random rand = new Random(); | |
| for (int i = 0 ; i < 1000 ; i++) { | |
| final String word = words[rand.nextInt(words.length)]; | |
| _collector.emit(new Values(word), UUID.randomUUID()); | |
| } | |
| Utils.sleep(5000); | |
| } | |
| public void ack(Object msgId) { | |
| } | |
| public void fail(Object msgId) { | |
| } | |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
| declarer.declare(new Fields("word")); | |
| } | |
| @Override | |
| public Map<String, Object> getComponentConfiguration() { | |
| if(!_isDistributed) { | |
| Map<String, Object> ret = new HashMap<String, Object>(); | |
| ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); | |
| return ret; | |
| } else { | |
| return null; | |
| } | |
| } | |
| } | |
| /** | |
| * This is a basic example of a Storm topology. | |
| */ | |
| public class DelayedExclamationTopology { | |
| public static class ExclamationBolt extends BaseRichBolt { | |
| OutputCollector _collector; | |
| @Override | |
| public void prepare(Map conf, TopologyContext context, OutputCollector collector) { | |
| _collector = collector; | |
| } | |
| @Override | |
| public void execute(Tuple tuple) { | |
| _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); | |
| _collector.ack(tuple); | |
| } | |
| @Override | |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
| declarer.declare(new Fields("word")); | |
| } | |
| } | |
| public static void main(String[] args) throws Exception { | |
| TopologyBuilder builder = new TopologyBuilder(); | |
| builder.setSpout("word", new DelayedWordSpout(), 10); | |
| builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); | |
| builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); | |
| Config conf = new Config(); | |
| conf.setDebug(true); | |
| if (args != null && args.length > 0) { | |
| conf.setNumWorkers(3); | |
| StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); | |
| } | |
| else { | |
| LocalCluster cluster = new LocalCluster(); | |
| cluster.submitTopology("test", conf, builder.createTopology()); | |
| Utils.sleep(10000); | |
| cluster.killTopology("test"); | |
| cluster.shutdown(); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment