Last active
October 24, 2016 08:02
-
-
Save stdatalabs/d1abb0bf277e293fc5f4ad40ff4dfef4 to your computer and use it in GitHub Desktop.
A storm bolt to split tuples sent by KafkaSpout. More @ stdatalabs.blogspot.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import backtype.storm.task.OutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.base.BaseRichBolt; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Tuple; | |
import backtype.storm.tuple.Values; | |
import twitter4j.Status; | |
import java.util.Map; | |
/** | |
* Receives tweets and emits its words over a certain length. | |
* | |
* More discussion at stdatalabs.blogspot.com | |
* | |
* @author Sachin Thirumala | |
*/ | |
public class JsonWordSplitterBolt extends BaseRichBolt { | |
private static final long serialVersionUID = 5151173513759399636L; | |
private final int minWordLength; | |
private OutputCollector collector; | |
public JsonWordSplitterBolt(int minWordLength) { | |
this.minWordLength = minWordLength; | |
} | |
@Override | |
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { | |
this.collector = collector; | |
} | |
@Override | |
public void execute(Tuple input) { | |
/* Status tweet = (Status) input.getValueByField("tweet"); | |
String lang = tweet.getUser().getLang(); | |
String text = tweet.getText().replaceAll("\\p{Punct}", " ").replaceAll("\\r|\\n", "").toLowerCase();*/ | |
String tweet = (String) input.getValueByField("tweet"); | |
String lang = "en"; | |
String text = tweet.replaceAll("\\p{Punct}", " ").replaceAll("\\r|\\n", "").toLowerCase(); | |
String[] words = text.split(" "); | |
for (String word : words) { | |
if (word.length() >= minWordLength) { | |
//collector.emit(new Values(lang, word)); | |
collector.emit(input, new Values(lang, word)); | |
} | |
} | |
collector.ack(input); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("lang", "word")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment