Skip to content

Instantly share code, notes, and snippets.

@guenter
Created June 22, 2012 01:53
Show Gist options
  • Save guenter/2969752 to your computer and use it in GitHub Desktop.
Save guenter/2969752 to your computer and use it in GitHub Desktop.
package airbnb.cascading.sessions;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.Pipe;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
/**
* @author Tobi Knaup
*/
public class DebugMain {
public static final String REQUESTS = "requests";
public static final String AGGREGATE_SESSIONS = "aggregate_sessions";
public static final String RAW_SESSIONS = "raw_sessions";
public void run() {
Tap requestSource;
Tap requestSink;
Tap sessionSink;
FlowConnector connector;
// Using this instance for both sinks causes IllegalArgumentException: no such vertex in graph
TextLine sinkScheme = new TextLine(new Fields("line"));
requestSource = new Hfs(new TextLine(), "/tmp/requestInput");
requestSink = new Hfs(sinkScheme, "/tmp/requestOut", SinkMode.REPLACE);
sessionSink = new Hfs(sinkScheme, "/tmp/sessionOut", SinkMode.REPLACE);
connector = new HadoopFlowConnector();
Pipe requestPipe = new Pipe(REQUESTS);
Pipe aggregateSessions = new Pipe(AGGREGATE_SESSIONS, requestPipe);
Pipe rawSessions = new Pipe(RAW_SESSIONS, requestPipe);
Map<String, Tap> sinks = ImmutableMap.of(RAW_SESSIONS, requestSink, AGGREGATE_SESSIONS, sessionSink);
Flow flow = connector.connect("flow", requestSource, sinks, rawSessions, aggregateSessions);
flow.complete();
}
public static void main(String[] args) throws Exception {
DebugMain verifyData = new DebugMain();
verifyData.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment