Skip to content

Instantly share code, notes, and snippets.

@cwensel
Created June 2, 2013 16:52
Show Gist options
  • Select an option

  • Save cwensel/5694115 to your computer and use it in GitHub Desktop.

Select an option

Save cwensel/5694115 to your computer and use it in GitHub Desktop.
@Test
public void testTrapTapSourceSinkCopy() throws Exception
{
getPlatform().copyFromLocal( inputFileApache );
Scheme scheme = getPlatform().getTestFailScheme();
Tap source = getPlatform().getTap( scheme, inputFileApache, SinkMode.KEEP );
Pipe pipe = new Pipe( "map" );
Tap sink = getPlatform().getTap( scheme, getOutputPath( "trapsourcesinkcopy/sink" ), SinkMode.REPLACE );
Tap trap = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "trapsourcesinkcopy/trap" ), SinkMode.REPLACE );
Map<Object, Object> properties = getProperties();
// compensate for running in cluster mode
properties.put( "mapred.map.tasks", 1 );
properties.put( "mapred.reduce.tasks", 1 );
FlowDef flowDef = FlowDef.flowDef()
.addSource( pipe, source )
.addTailSink( pipe, sink )
.addTrap( pipe, trap );
Flow flow = getPlatform().getFlowConnector( properties ).connect( flowDef );
flow.complete();
validateLength( flow.openTapForRead( getPlatform().getTextFile( sink.getIdentifier() ) ), 9 );
validateLength( flow.openTrap(), 2, Pattern.compile( "bad data" ) ); // confirm the payload is written
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment