Created
January 12, 2016 13:34
-
-
Save SriramKeerthi/c4fd440f561ba52ed7e0 to your computer and use it in GitHub Desktop.
Word Count using Storm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import backtype.storm.Config; | |
import backtype.storm.LocalCluster; | |
import backtype.storm.spout.SpoutOutputCollector; | |
import backtype.storm.task.OutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.TopologyBuilder; | |
import backtype.storm.topology.base.BaseRichBolt; | |
import backtype.storm.topology.base.BaseRichSpout; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Tuple; | |
import backtype.storm.tuple.Values; | |
import java.io.BufferedReader; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import java.util.HashMap; | |
import java.util.Map; | |
/** | |
* Counts words in a text file using Storm | |
* @author Sriram | |
*/ | |
public class WordCountStorm | |
{ | |
static class LineSpout extends BaseRichSpout | |
{ | |
BufferedReader br; | |
SpoutOutputCollector collector; | |
String fileName; | |
public LineSpout( String fileName ) | |
{ | |
this.fileName = fileName; | |
} | |
@Override public void declareOutputFields( OutputFieldsDeclarer declarer ) | |
{ | |
declarer.declare( new Fields( "line" ) ); | |
} | |
@Override public void open( Map conf, TopologyContext context, SpoutOutputCollector collector ) | |
{ | |
try { | |
this.collector = collector; | |
br = new BufferedReader( new FileReader( fileName ) ); | |
} catch ( IOException e ) { | |
e.printStackTrace(); | |
} | |
} | |
@Override public void nextTuple() | |
{ | |
try { | |
String line; | |
if ( ( line = br.readLine() ) != null ) { | |
collector.emit( new Values( line ) ); | |
} | |
} catch ( IOException e ) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
static class TokenizerBolt extends BaseRichBolt | |
{ | |
OutputCollector collector; | |
@Override public void prepare( Map stormConf, TopologyContext context, OutputCollector collector ) | |
{ | |
this.collector = collector; | |
} | |
@Override public void execute( Tuple input ) | |
{ | |
String line = input.getStringByField( "line" ); | |
for ( String word : line.split( "[ _\\-'\",<>\\.]" ) ) { | |
if ( word.length() > 0 ) { | |
collector.emit( new Values( word.toLowerCase(), 1 ) ); | |
} | |
} | |
} | |
@Override public void declareOutputFields( OutputFieldsDeclarer declarer ) | |
{ | |
declarer.declare( new Fields( "word", "count" ) ); | |
} | |
} | |
static class CounterBolt extends BaseRichBolt | |
{ | |
static Map<String, Integer> counts = new HashMap<>(); | |
@Override public void prepare( Map stormConf, TopologyContext context, OutputCollector collector ) | |
{ | |
} | |
@Override public void execute( Tuple input ) | |
{ | |
final String key = input.getStringByField( "word" ); | |
if ( counts.containsKey( key ) ) { | |
counts.put( key, counts.get( key ) + 1 ); | |
} else { | |
counts.put( key, 1 ); | |
} | |
} | |
@Override public void declareOutputFields( OutputFieldsDeclarer declarer ) | |
{ | |
} | |
} | |
public static void main( final String[] args ) throws Exception | |
{ | |
TopologyBuilder builder = new TopologyBuilder(); | |
builder.setSpout( "lineSpout", new LineSpout( args[0] ) ); | |
builder.setBolt( "tokenizerBolt", new TokenizerBolt() ).shuffleGrouping( "lineSpout" ); | |
builder.setBolt( "wordCountBolt", new CounterBolt() ).shuffleGrouping( "tokenizerBolt" ); | |
Config conf = new Config(); | |
conf.setDebug( true ); | |
conf.setNumWorkers( 2 ); | |
LocalCluster cluster = new LocalCluster(); | |
cluster.submitTopology( "wordCount", conf, builder.createTopology() ); | |
Thread.sleep( 10000 ); | |
cluster.shutdown(); | |
System.out.println( CounterBolt.counts ); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment