Created
February 7, 2012 18:28
-
-
Save cwensel/1761117 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Test | |
| public void testSameSourceMergeThreeChainGroup() throws Exception | |
| { | |
| getPlatform().copyFromLocal( inputFileLower ); | |
| Tap sourceLower = getPlatform().getTextFile( inputFileLower ); | |
| Map sources = new HashMap(); | |
| sources.put( "split", sourceLower ); | |
| Tap sink = getPlatform().getTextFile( getOutputPath( "samemergethreechaingroup" ), SinkMode.REPLACE ); | |
| Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); | |
| Pipe pipe = new Pipe( "split" ); | |
| Pipe pipeLower = new Each( new Pipe( "lower", pipe ), new Fields( "line" ), splitter ); | |
| Pipe pipeUpper = new Each( new Pipe( "upper", pipe ), new Fields( "line" ), splitter ); | |
| Pipe pipeOffset = new Each( new Pipe( "offset", pipe ), new Fields( "line" ), splitter ); | |
| Pipe splice = new Merge( "merge", pipeLower, pipeUpper ); | |
| //put group before merge to test path counts | |
| splice = new GroupBy( splice, new Fields( "num" ) ); | |
| splice = new Merge( splice, pipeOffset ); | |
| // this group has its incoming paths counted, gated by the previous group | |
| splice = new GroupBy( splice, new Fields( "num" ) ); | |
| Pipe lhs = new Each( new Pipe( "lhs", splice ), new Identity() ); | |
| Pipe rhs = new Each( new Pipe( "rhs", splice ), new Identity() ); | |
| splice = new Merge( lhs, rhs ); | |
| splice = new GroupBy( splice, new Fields( "num" ) ); | |
| Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); | |
| if( getPlatform() instanceof HadoopPlatform ) | |
| assertEquals( "wrong num jobs", 3, flow.getFlowSteps().size() ); | |
| flow.complete(); | |
| validateLength( flow, 30 ); | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment