Skip to content

Instantly share code, notes, and snippets.

@yssharma
Created December 13, 2012 06:49
Show Gist options
  • Select an option

  • Save yssharma/4274607 to your computer and use it in GitHub Desktop.

Select an option

Save yssharma/4274607 to your computer and use it in GitHub Desktop.
package cascading.cascading;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexSplitGenerator;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
public class App
{
public static void main( String[] args )
{
String inPath = "hdfs://localhost:9000/cascading/input/input.txt";
String outPath = "hdfs://localhost:9000/cascading/output/";
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, App.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );
/** create source and sink taps **/
/** The 'true' parameter in TextDelimited's constructor says that
your document contains a header in it **/
Tap inTap = new Hfs( new TextDelimited( true, "\t" ), inPath );
Tap outTap = new Hfs( new TextDelimited( true, "\t" ), outPath );
Fields input = new Fields( "text" ); // column name to read data from
Fields output = new Fields( "output_tokens" ); // column name for output document
Fields count = new Fields("count"); // column name for output document
/** Split all words by these delimiters: [, (, ], ), dot, comma, space **/
RegexSplitGenerator regexFunction = new RegexSplitGenerator( output, "[ \\[\\]\\(\\),.]" );
Pipe inPipe = new Each( "tokenpipe", input, regexFunction, Fields.RESULTS);
/** Determine the word count **/
Pipe countPipe = new Pipe( "wordcount-outpipe", inPipe );
countPipe = new GroupBy( countPipe, output );
countPipe = new Every( countPipe, Fields.ALL, new Count(count), Fields.ALL );
/** Create flow by connecting source, pipe and sink **/
Flow flow = flowConnector.connect( "word-count-flow", inTap, outTap, countPipe );
flow.writeDOT( "dot/wc.dot" ); /** Great for debugging **/
flow.complete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment