Created
March 22, 2014 11:00
-
-
Save natbusa/9705234 to your computer and use it in GitHub Desktop.
Word count in Haddop cascading
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ScrubFunction extends BaseOperation implements Function | |
{ | |
public ScrubFunction( Fields fieldDeclaration ) | |
{ | |
super( 1, fieldDeclaration ); | |
} | |
public void operate( FlowProcess flowProcess, FunctionCall functionCall ) | |
{ | |
TupleEntry argument = functionCall.getArguments(); | |
String token = scrubText( argument.getString( 0 ) ); | |
if( token.length() > 0 ) | |
{ | |
Tuple result = new Tuple(); | |
result.add( token ); | |
functionCall.getOutputCollector().add( result ); | |
} | |
} | |
public String scrubText( String text ) | |
{ | |
return text.trim().toLowerCase(); | |
} | |
} | |
public class | |
Main | |
{ | |
public static void | |
main( String[] args ) | |
{ | |
String docPath = args[ 0 ]; | |
String wcPath = args[ 1 ]; | |
Properties properties = new Properties(); | |
AppProps.setApplicationJarClass( properties, Main.class ); | |
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties ); | |
// create source and sink taps | |
Tap docTap = new Hfs( new TextLine(), docPath ); | |
Tap wcTap = new Hfs( new TextLine(), wcPath ); | |
// specify a regex operation to split the "document" text lines into a token stream | |
Fields token = new Fields( "token" ); | |
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "\\W+" ); | |
// only returns "words" | |
Fields text = new Fields( "line" ); | |
Pipe wordPipe = new Each("WP", text, splitter, Fields.RESULTS ); | |
// replace tokens with lowercase words | |
Fields word = new Fields( "word" ); | |
wordPipe = new Each( wordPipe, token, new ScrubFunction( word ), Fields.RESULTS ); | |
// determine the word counts | |
Pipe wcPipe = new Pipe( "wc", wordPipe ); | |
wcPipe = new GroupBy( wcPipe, word ); | |
wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL ); | |
// connect the taps, pipes, etc., into a flow | |
FlowDef flowDef = FlowDef.flowDef() | |
.setName( "wc" ) | |
.addSource( wordPipe, docTap ) | |
.addTailSink( wcPipe, wcTap ); | |
// run the flow | |
Flow wcFlow = flowConnector.connect( flowDef ); | |
wcFlow.writeDOT( "dot/wc.dot" ); | |
wcFlow.complete(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment