Skip to content

Instantly share code, notes, and snippets.

@SriramKeerthi
Created January 12, 2016 13:34
Show Gist options
  • Save SriramKeerthi/c4fd440f561ba52ed7e0 to your computer and use it in GitHub Desktop.
Save SriramKeerthi/c4fd440f561ba52ed7e0 to your computer and use it in GitHub Desktop.
Word Count using Storm
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Counts words in a text file using Storm
* @author Sriram
*/
public class WordCountStorm
{
static class LineSpout extends BaseRichSpout
{
BufferedReader br;
SpoutOutputCollector collector;
String fileName;
public LineSpout( String fileName )
{
this.fileName = fileName;
}
@Override public void declareOutputFields( OutputFieldsDeclarer declarer )
{
declarer.declare( new Fields( "line" ) );
}
@Override public void open( Map conf, TopologyContext context, SpoutOutputCollector collector )
{
try {
this.collector = collector;
br = new BufferedReader( new FileReader( fileName ) );
} catch ( IOException e ) {
e.printStackTrace();
}
}
@Override public void nextTuple()
{
try {
String line;
if ( ( line = br.readLine() ) != null ) {
collector.emit( new Values( line ) );
}
} catch ( IOException e ) {
e.printStackTrace();
}
}
}
static class TokenizerBolt extends BaseRichBolt
{
OutputCollector collector;
@Override public void prepare( Map stormConf, TopologyContext context, OutputCollector collector )
{
this.collector = collector;
}
@Override public void execute( Tuple input )
{
String line = input.getStringByField( "line" );
for ( String word : line.split( "[ _\\-'\",<>\\.]" ) ) {
if ( word.length() > 0 ) {
collector.emit( new Values( word.toLowerCase(), 1 ) );
}
}
}
@Override public void declareOutputFields( OutputFieldsDeclarer declarer )
{
declarer.declare( new Fields( "word", "count" ) );
}
}
static class CounterBolt extends BaseRichBolt
{
static Map<String, Integer> counts = new HashMap<>();
@Override public void prepare( Map stormConf, TopologyContext context, OutputCollector collector )
{
}
@Override public void execute( Tuple input )
{
final String key = input.getStringByField( "word" );
if ( counts.containsKey( key ) ) {
counts.put( key, counts.get( key ) + 1 );
} else {
counts.put( key, 1 );
}
}
@Override public void declareOutputFields( OutputFieldsDeclarer declarer )
{
}
}
public static void main( final String[] args ) throws Exception
{
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "lineSpout", new LineSpout( args[0] ) );
builder.setBolt( "tokenizerBolt", new TokenizerBolt() ).shuffleGrouping( "lineSpout" );
builder.setBolt( "wordCountBolt", new CounterBolt() ).shuffleGrouping( "tokenizerBolt" );
Config conf = new Config();
conf.setDebug( true );
conf.setNumWorkers( 2 );
LocalCluster cluster = new LocalCluster();
cluster.submitTopology( "wordCount", conf, builder.createTopology() );
Thread.sleep( 10000 );
cluster.shutdown();
System.out.println( CounterBolt.counts );
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment