Created
June 22, 2012 01:53
-
-
Save guenter/2969752 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package airbnb.cascading.sessions; | |
import cascading.flow.Flow; | |
import cascading.flow.FlowConnector; | |
import cascading.flow.hadoop.HadoopFlowConnector; | |
import cascading.pipe.Pipe; | |
import cascading.scheme.hadoop.TextLine; | |
import cascading.tap.SinkMode; | |
import cascading.tap.Tap; | |
import cascading.tap.hadoop.Hfs; | |
import cascading.tuple.Fields; | |
import com.google.common.collect.ImmutableMap; | |
import java.util.Map; | |
/** | |
* @author Tobi Knaup | |
*/ | |
public class DebugMain { | |
public static final String REQUESTS = "requests"; | |
public static final String AGGREGATE_SESSIONS = "aggregate_sessions"; | |
public static final String RAW_SESSIONS = "raw_sessions"; | |
public void run() { | |
Tap requestSource; | |
Tap requestSink; | |
Tap sessionSink; | |
FlowConnector connector; | |
// Using this instance for both sinks causes IllegalArgumentException: no such vertex in graph | |
TextLine sinkScheme = new TextLine(new Fields("line")); | |
requestSource = new Hfs(new TextLine(), "/tmp/requestInput"); | |
requestSink = new Hfs(sinkScheme, "/tmp/requestOut", SinkMode.REPLACE); | |
sessionSink = new Hfs(sinkScheme, "/tmp/sessionOut", SinkMode.REPLACE); | |
connector = new HadoopFlowConnector(); | |
Pipe requestPipe = new Pipe(REQUESTS); | |
Pipe aggregateSessions = new Pipe(AGGREGATE_SESSIONS, requestPipe); | |
Pipe rawSessions = new Pipe(RAW_SESSIONS, requestPipe); | |
Map<String, Tap> sinks = ImmutableMap.of(RAW_SESSIONS, requestSink, AGGREGATE_SESSIONS, sessionSink); | |
Flow flow = connector.connect("flow", requestSource, sinks, rawSessions, aggregateSessions); | |
flow.complete(); | |
} | |
public static void main(String[] args) throws Exception { | |
DebugMain verifyData = new DebugMain(); | |
verifyData.run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment