Skip to content

Instantly share code, notes, and snippets.

@cwensel
Created February 7, 2012 18:28
Show Gist options
  • Select an option

  • Save cwensel/1761117 to your computer and use it in GitHub Desktop.

Select an option

Save cwensel/1761117 to your computer and use it in GitHub Desktop.
@Test
public void testSameSourceMergeThreeChainGroup() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
Tap sourceLower = getPlatform().getTextFile( inputFileLower );
Map sources = new HashMap();
sources.put( "split", sourceLower );
Tap sink = getPlatform().getTextFile( getOutputPath( "samemergethreechaingroup" ), SinkMode.REPLACE );
Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );
Pipe pipe = new Pipe( "split" );
Pipe pipeLower = new Each( new Pipe( "lower", pipe ), new Fields( "line" ), splitter );
Pipe pipeUpper = new Each( new Pipe( "upper", pipe ), new Fields( "line" ), splitter );
Pipe pipeOffset = new Each( new Pipe( "offset", pipe ), new Fields( "line" ), splitter );
Pipe splice = new Merge( "merge", pipeLower, pipeUpper );
//put group before merge to test path counts
splice = new GroupBy( splice, new Fields( "num" ) );
splice = new Merge( splice, pipeOffset );
// this group has its incoming paths counted, gated by the previous group
splice = new GroupBy( splice, new Fields( "num" ) );
Pipe lhs = new Each( new Pipe( "lhs", splice ), new Identity() );
Pipe rhs = new Each( new Pipe( "rhs", splice ), new Identity() );
splice = new Merge( lhs, rhs );
splice = new GroupBy( splice, new Fields( "num" ) );
Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice );
if( getPlatform() instanceof HadoopPlatform )
assertEquals( "wrong num jobs", 3, flow.getFlowSteps().size() );
flow.complete();
validateLength( flow, 30 );
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment