Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Created April 29, 2016 07:09
Show Gist options
  • Select an option

  • Save HeartSaVioR/9ebac9f7f8503e878b0009a37c2b686b to your computer and use it in GitHub Desktop.

Select an option

Save HeartSaVioR/9ebac9f7f8503e878b0009a37c2b686b to your computer and use it in GitHub Desktop.
package org.apache.storm.starter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
class DelayedWordSpout extends BaseRichSpout {
public static org.slf4j.Logger LOG = LoggerFactory.getLogger(DelayedWordSpout.class);
boolean _isDistributed;
SpoutOutputCollector _collector;
public DelayedWordSpout() {
this(true);
}
public DelayedWordSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
public void close() {
}
public void nextTuple() {
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
for (int i = 0 ; i < 1000 ; i++) {
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word), UUID.randomUUID());
}
Utils.sleep(5000);
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
if(!_isDistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
return null;
}
}
}
/**
* This is a basic example of a Storm topology.
*/
public class DelayedExclamationTopology {
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new DelayedWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment