Created
July 25, 2013 19:40
-
-
Save piotrbelina/6083056 to your computer and use it in GitHub Desktop.
Cascading apache log parser for boomerang.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package piotr; | |
import cascading.flow.Flow; | |
import cascading.flow.FlowDef; | |
import cascading.flow.hadoop.HadoopFlowConnector; | |
import cascading.operation.Aggregator; | |
import cascading.operation.aggregator.Average; | |
import cascading.operation.aggregator.Count; | |
import cascading.operation.regex.RegexFilter; | |
import cascading.operation.regex.RegexParser; | |
import cascading.pipe.Each; | |
import cascading.pipe.Every; | |
import cascading.pipe.GroupBy; | |
import cascading.pipe.Pipe; | |
import cascading.property.AppProps; | |
import cascading.scheme.hadoop.TextLine; | |
import cascading.scheme.hadoop.TextDelimited; | |
import cascading.tap.SinkMode; | |
import cascading.tap.Tap; | |
import cascading.tap.hadoop.Hfs; | |
import cascading.tuple.Fields; | |
import java.util.Properties; | |
public class Main { | |
public static void main(String[] args) { | |
String logPath = args[0]; | |
String statsPath = args[1]; | |
String trapPath = args[2]; | |
Properties properties = new Properties(); | |
AppProps.setApplicationJarClass(properties, Main.class); | |
HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties); | |
TextLine scheme = new TextLine(new Fields("offset", "line")); | |
Tap logTap = new Hfs(scheme, logPath); | |
Tap remoteLogTap = new Hfs( new TextLine(), statsPath, SinkMode.REPLACE); | |
Tap trapTap = new Hfs( new TextDelimited(true, "\t"), trapPath ); | |
Fields apacheFields = new Fields( "ip", "time", "method", "event"); | |
Fields urlFields = new Fields( "ip", "time", "method", "event", "url", "done", "t_resp", "bw", "bw_err", "lat", "lat_err"); | |
String apacheRegex = "([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*).*$"; | |
int[] allGroups = {1, 2, 3, 4}; | |
RegexParser parser = new RegexParser(apacheFields, apacheRegex, allGroups); | |
Pipe importPipe = new Each("import", new Fields("line"), parser, Fields.RESULTS); | |
Pipe logPipe = new Pipe("topurl", importPipe); | |
RegexFilter regexFilter = new RegexFilter("beacon\\.php"); | |
logPipe = new Each(logPipe, new Fields("event"), regexFilter); | |
logPipe = new Each(logPipe, apacheFields, new UrlParser(urlFields),Fields.RESULTS); | |
logPipe = new GroupBy(logPipe, new Fields("url")); | |
Aggregator count = new Count(new Fields("count")); | |
Aggregator averageDone = new Average(new Fields("avg_done")); | |
Aggregator averageResponse = new Average(new Fields("avg_response")); | |
logPipe = new Every(logPipe, count); | |
logPipe = new Every(logPipe, new Fields("done"), averageDone); | |
logPipe = new Every(logPipe, new Fields("t_resp"), averageResponse); | |
logPipe = new GroupBy(logPipe, new Fields("count"), new Fields("url"), true); | |
FlowDef flowDef = FlowDef.flowDef() | |
.setName("apachelog") | |
.addSource(importPipe, logTap) | |
.addTailSink(logPipe, remoteLogTap) | |
.addTrap(importPipe, trapTap); | |
Flow flow = flowConnector | |
.connect(flowDef); | |
flow.writeDOT("dot/apachelog.dot"); | |
flow.complete(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package piotr; | |
import cascading.flow.FlowProcess; | |
import cascading.operation.BaseOperation; | |
import cascading.operation.Function; | |
import cascading.operation.FunctionCall; | |
import cascading.tuple.Fields; | |
import cascading.tuple.Tuple; | |
import cascading.tuple.TupleEntry; | |
import java.io.UnsupportedEncodingException; | |
import java.net.URLDecoder; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
public class UrlParser extends BaseOperation implements Function { | |
public UrlParser(Fields fieldDeclaration) { | |
super(4, fieldDeclaration); | |
} | |
public static Map<String, List<String>> getUrlParameters(String url) | |
throws UnsupportedEncodingException { | |
Map<String, List<String>> params = new HashMap<String, List<String>>(); | |
String[] urlParts = url.split("\\?"); | |
if (urlParts.length > 1) { | |
String query = urlParts[1]; | |
for (String param : query.split("&")) { | |
String pair[] = param.split("="); | |
String key = URLDecoder.decode(pair[0], "UTF-8"); | |
String value = ""; | |
if (pair.length > 1) { | |
value = URLDecoder.decode(pair[1], "UTF-8"); | |
} | |
List<String> values = params.get(key); | |
if (values == null) { | |
values = new ArrayList<String>(); | |
params.put(key, values); | |
} | |
values.add(value); | |
} | |
} | |
return params; | |
} | |
public<E> E getFirst(List<E> list) { | |
if (list.size() == 1) { | |
return list.get(0); | |
} | |
throw new IndexOutOfBoundsException(); | |
} | |
@Override | |
public void operate(FlowProcess flowProcess, FunctionCall functionCall) { | |
TupleEntry argument = functionCall.getArguments(); | |
String ip = argument.getString( 0 ); | |
String time = argument.getString( 1 ); | |
String method = argument.getString( 2 ); | |
String event = argument.getString( 3 ); | |
try | |
{ | |
Map<String, List<String>> params = getUrlParameters(event); | |
String url = getFirst(params.get("u")); | |
String done = getFirst(params.get("t_done")); | |
String t_resp = getFirst(params.get("t_resp")); | |
String bw = getFirst(params.get("bw")); | |
String bw_err = getFirst(params.get("bw_err")); | |
String lat = getFirst(params.get("lat")); | |
String lat_err = getFirst(params.get("lat_err")); | |
Tuple result = new Tuple(); | |
result.add( ip ); | |
result.add( time ); | |
result.add( method ); | |
result.add( event ); | |
result.add( url ); | |
result.add( done ); | |
result.add( t_resp ); | |
result.add( bw ); | |
result.add( bw_err ); | |
result.add( lat ); | |
result.add( lat_err ); | |
functionCall.getOutputCollector().add( result ); | |
} catch (UnsupportedEncodingException e) { | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment