Last active
October 24, 2016 07:52
-
-
Save stdatalabs/b045c0660b4eacadba5639fee4ab18cf to your computer and use it in GitHub Desktop.
A string word splitter bolt that receives tweets and emits its words which are over a certain length. More @ stdatalabs.blogspot.com
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import backtype.storm.task.OutputCollector; | |
| import backtype.storm.task.TopologyContext; | |
| import backtype.storm.topology.OutputFieldsDeclarer; | |
| import backtype.storm.topology.base.BaseRichBolt; | |
| import backtype.storm.tuple.Fields; | |
| import backtype.storm.tuple.Tuple; | |
| import backtype.storm.tuple.Values; | |
| import twitter4j.Status; | |
| import java.util.Map; | |
| /** | |
| * Receives tweets and emits its words which are over a certain length. | |
| * | |
| * More discussion at stdatalabs.blogspot.com | |
| * | |
| * @author Sachin Thirumala | |
| */ | |
| public class StringWordSplitterBolt extends BaseRichBolt { | |
| private static final long serialVersionUID = 5151173513759399636L; | |
| private final int minWordLength; | |
| private OutputCollector collector; | |
| public StringWordSplitterBolt(int minWordLength) { | |
| this.minWordLength = minWordLength; | |
| } | |
| @Override | |
| public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { | |
| this.collector = collector; | |
| } | |
| @Override | |
| public void execute(Tuple input) { | |
| Status tweet = (Status) input.getValueByField("tweet"); | |
| String lang = tweet.getUser().getLang(); | |
| String text = tweet.getText().replaceAll("\\p{Punct}", " ").replaceAll("\\r|\\n", "").toLowerCase(); | |
| //String tweet = (String) input.getValueByField("tweet"); | |
| //String lang = "en"; | |
| //String text = tweet.replaceAll("\\p{Punct}", " ").replaceAll("\\r|\\n", "").toLowerCase(); | |
| String[] words = text.split(" "); | |
| for (String word : words) { | |
| if (word.length() >= minWordLength) { | |
| //collector.emit(new Values(lang, word)); | |
| collector.emit(input, new Values(lang, word)); | |
| } | |
| } | |
| collector.ack(input); | |
| } | |
| @Override | |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
| declarer.declare(new Fields("lang", "word")); | |
| } | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment