Skip to content

Instantly share code, notes, and snippets.

@natbusa
Created March 22, 2014 11:00
Show Gist options
  • Save natbusa/9705234 to your computer and use it in GitHub Desktop.
Save natbusa/9705234 to your computer and use it in GitHub Desktop.
Word count in Haddop cascading
class ScrubFunction extends BaseOperation implements Function
{
public ScrubFunction( Fields fieldDeclaration )
{
super( 1, fieldDeclaration );
}
public void operate( FlowProcess flowProcess, FunctionCall functionCall )
{
TupleEntry argument = functionCall.getArguments();
String token = scrubText( argument.getString( 0 ) );
if( token.length() > 0 )
{
Tuple result = new Tuple();
result.add( token );
functionCall.getOutputCollector().add( result );
}
}
public String scrubText( String text )
{
return text.trim().toLowerCase();
}
}
public class
Main
{
public static void
main( String[] args )
{
String docPath = args[ 0 ];
String wcPath = args[ 1 ];
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
// create source and sink taps
Tap docTap = new Hfs( new TextLine(), docPath );
Tap wcTap = new Hfs( new TextLine(), wcPath );
// specify a regex operation to split the "document" text lines into a token stream
Fields token = new Fields( "token" );
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "\\W+" );
// only returns "words"
Fields text = new Fields( "line" );
Pipe wordPipe = new Each("WP", text, splitter, Fields.RESULTS );
// replace tokens with lowercase words
Fields word = new Fields( "word" );
wordPipe = new Each( wordPipe, token, new ScrubFunction( word ), Fields.RESULTS );
// determine the word counts
Pipe wcPipe = new Pipe( "wc", wordPipe );
wcPipe = new GroupBy( wcPipe, word );
wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );
// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.setName( "wc" )
.addSource( wordPipe, docTap )
.addTailSink( wcPipe, wcTap );
// run the flow
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.writeDOT( "dot/wc.dot" );
wcFlow.complete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment