-
-
Save risarora/10c275b954254982c2d7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
About this gist: | |
================ | |
This gist is a part of a series of log parsers in Java Mapreduce, Pig, Hive, Python... | |
This one covers a log parser in Cascading. | |
It reads syslogs in HDFS - | |
a) Parses them based on a regex pattern & writes parsed files to HDFS | |
b) Writes records that dont match pattern to HDFS | |
c) Writes a report to HDFS that contains the count of distinct processes logged. | |
Other gists/blogs: | |
================== | |
1) Log parsing - Java Mapreduce | |
http://hadooped.blogspot.com/2013/07/log-parsing-in-hadoop-part-1-java-using.html | |
2) Log parsing - Pig | |
http://hadooped.blogspot.com/2013/07/log-parsing-in-hadoop-part-3-pig-latin.html | |
3) Log parsing - Hive | |
http://hadooped.blogspot.com/2013/07/log-parsing-in-hadoop-part-2-hive-using.html | |
4) Log parsing - Python/streaming | |
http://hadooped.blogspot.com/2013/07/log-parsing-in-hadoop-part-1-python.html | |
TODOs: | |
====== | |
The year is not in the syslogs, but is part of the file path (its a directory). | |
Figure out how to extract the year from the file path, in Cascading, and add as a field. | |
https://groups.google.com/forum/#!msg/cascading-user/1llgrXOL69Q/BzzPJRbpQ_MJ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sample log data | |
--------------- | |
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal | |
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1 | |
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray | |
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr | |
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max) | |
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns | |
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org | |
Data structure | |
-------------- | |
Month = May | |
Day = 3 | |
Time = 11:52:54 | |
Node = cdh-dn03 | |
Process = init: | |
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Directory structure of logs | |
--------------------------- | |
cascadingSamples | |
data | |
syslogs | |
<<Node-Name>> | |
<<Year>> | |
<<Month>> | |
messages | |
Specifically... | |
LogParser/data/syslogs/cdh-dev01/2013/04/messages | |
LogParser/data/syslogs/cdh-dev01/2013/05/messages | |
LogParser/data/syslogs/cdh-dn01/2013/05/messages | |
LogParser/data/syslogs/cdh-dn02/2013/04/messages | |
LogParser/data/syslogs/cdh-dn02/2013/05/messages | |
LogParser/data/syslogs/cdh-dn03/2013/04/messages | |
LogParser/data/syslogs/cdh-dn03/2013/05/messages | |
LogParser/data/syslogs/cdh-jt01/2013/04/messages | |
LogParser/data/syslogs/cdh-jt01/2013/05/messages | |
LogParser/data/syslogs/cdh-nn01/2013/05/messages | |
LogParser/data/syslogs/cdh-vms/2013/05/messages |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package logparser; | |
import java.util.Properties; | |
import java.util.Collections; | |
import cascading.flow.Flow; | |
import cascading.flow.FlowDef; | |
import cascading.flow.FlowProcess; | |
import cascading.flow.hadoop.HadoopFlowConnector; | |
import cascading.flow.hadoop.HadoopFlowProcess; | |
import cascading.tap.hadoop.GlobHfs; | |
import cascading.tap.MultiSourceTap; | |
import cascading.tap.hadoop.Hfs; | |
import cascading.operation.regex.RegexParser; | |
import cascading.pipe.Each; | |
import cascading.pipe.Every; | |
import cascading.pipe.GroupBy; | |
import cascading.pipe.Pipe; | |
import cascading.property.AppProps; | |
import cascading.scheme.Scheme; | |
import cascading.scheme.hadoop.TextLine; | |
import cascading.tap.SinkMode; | |
import cascading.tap.Tap; | |
import cascading.tuple.Fields; | |
import cascading.operation.aggregator.Count; | |
import cascading.operation.expression.ExpressionFunction; | |
public class LogParser { | |
public static void main(String[] args) { | |
// {{ | |
// INSTANTIATE/INITIALIZE | |
// Set the current job jar | |
Properties properties = new Properties(); | |
AppProps.setApplicationJarClass( properties, LogParser.class ); | |
HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties); | |
// Arguments | |
String inputPath = args[ 0 ]; | |
String outputPath = args[ 1 ]; | |
String errorPath = args[ 2 ]; | |
String reportPath = args[ 3 ]; | |
// Scheme for sinks | |
TextLine sinkTextLineScheme = new TextLine(); | |
// Define what the input file looks like, "offset" is bytes from beginning | |
TextLine sourceTextLineScheme = new TextLine( new Fields( "offset", "line" ) ); | |
// The inputPath is a file glob, and a so, GlobHfs is used below | |
GlobHfs sourceFilesGlob = new GlobHfs( sourceTextLineScheme, inputPath ); | |
// Create SOURCE tap to read a resource from the HDFS glob | |
Tap sourceSyslogTap = new MultiSourceTap(sourceFilesGlob); | |
// Create a SINK tap to write parsed logs to HDFS | |
sinkTextLineScheme.setNumSinkParts(2); | |
Tap sinkParsedLogTap = new Hfs( sinkTextLineScheme, outputPath, SinkMode.REPLACE); | |
// Create a SINK tap to write reports to HDFS | |
sinkTextLineScheme.setNumSinkParts(1); | |
Tap sinkReportTap = new Hfs(sinkTextLineScheme, reportPath, SinkMode.REPLACE ); | |
// Create a TRAP tap to write records that failed parsing | |
sinkTextLineScheme.setNumSinkParts(1); | |
Tap sinkTrapTap = new Hfs( sinkTextLineScheme, errorPath , SinkMode.REPLACE ); | |
// }} | |
// {{ | |
// EXTRACT/PARSE | |
// Declare the field names | |
Fields sysLogFields = new Fields( "month", "day", "time", "node", "process", "message" ); | |
// Define the regex pattern to parse the log file with | |
String sysLogRegex = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)"; | |
// Declare the groups from the above regex we want to keep. Each regex group will be given | |
// a field name from 'sysLogFields', above, respectively | |
int[] keepParsedGroups = {1, 2, 3, 4, 5, 6}; | |
// Create the parser | |
RegexParser parser = new RegexParser( sysLogFields, sysLogRegex, keepParsedGroups ); | |
// Import & parse pipe | |
// Create the import pipe element, with the name 'import', and with the input argument named "line" | |
// Replace the incoming tuple with the parser results | |
// "line" -> parser -> "ts" | |
Pipe importAndParsePipe = new Each( "import", new Fields( "line" ), parser, Fields.RESULTS ); | |
// }} | |
// {{ | |
// TRANSFORM | |
// Transform the process field - remove process ID if found, for better reporting on logs | |
// Also, convert to lowercase | |
// E.g. Change "ntpd[1302]" to "ntpd" | |
String expression = "process.substring(0, (process.indexOf('[') == -1 ? process.length()-1 : process.indexOf('[') )).toLowerCase()"; | |
Fields fieldProcess = new Fields( "process" ); | |
ExpressionFunction expFunc = | |
new ExpressionFunction( fieldProcess, expression, String.class ); | |
// Pipe for transformed data | |
Pipe scrubbedDataPipe = new Each( importAndParsePipe, fieldProcess, expFunc, Fields.REPLACE ); | |
// }} | |
// {{ | |
// REPORT/ANALYZE | |
// Capture counts by process, as a report, sort by count, desc | |
// ------------------------------------------------------------ | |
// process count() | |
// E.g. sshd 4 | |
Pipe reportPipe = new Pipe("reportByProcess", scrubbedDataPipe); | |
Fields keyFields = new Fields("process"); | |
Fields groupByFields = new Fields( "process"); | |
Fields countField = new Fields( "countOfEvents" ); | |
Fields sortByFields = new Fields( "process"); | |
reportPipe = new GroupBy(reportPipe, groupByFields); | |
reportPipe = new Every(reportPipe, keyFields, | |
new Count(countField), Fields.ALL); | |
reportPipe = new GroupBy(reportPipe, | |
keyFields, | |
countField, | |
false); //true=descending order | |
//End of reports | |
//}} | |
// {{ | |
// EXECUTE | |
// Connect the taps, pipes, etc., into a flow & execute | |
FlowDef flowDef = FlowDef.flowDef() | |
.setName( "Log parser" ) | |
.addSource( importAndParsePipe, sourceSyslogTap ) | |
.addTailSink( scrubbedDataPipe, sinkParsedLogTap ) | |
.addTailSink(reportPipe,sinkReportTap) | |
.addTrap( importAndParsePipe, sinkTrapTap ); | |
Flow flow = flowConnector.connect(flowDef); | |
flow.complete(); | |
// }} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apply plugin: 'java' | |
apply plugin: 'idea' | |
apply plugin: 'eclipse' | |
archivesBaseName = 'logparser-cascading' | |
repositories { | |
mavenLocal() | |
mavenCentral() | |
mavenRepo name: 'conjars', url: 'http://conjars.org/repo/' | |
} | |
ext.cascadingVersion = '2.5.1' | |
dependencies { | |
compile( group: 'cascading', name: 'cascading-core', version: cascadingVersion ) | |
compile( group: 'cascading', name: 'cascading-hadoop', version: cascadingVersion ) | |
} | |
jar { | |
description = "Assembles a Hadoop ready jar file" | |
doFirst { | |
into( 'lib' ) { | |
from configurations.compile | |
} } | |
manifest { | |
attributes( "Main-Class": "logparser/LogParser" ) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Gradle | |
================= | |
$ gradle clean jar | |
Should generate a jar with dependencies managed. | |
Load data to HDFS | |
================== | |
$ hadoop fs -mkdir cascadingSamples | |
$ cd ~ | |
$ hadoop fs -put cascadingSamples/data cascadingSamples | |
$ hadoop fs -put cascadingSamples/jars cascadingSamples | |
Run program | |
============ | |
hadoop jar cascadingSamples/jars/logparser-cascading.jar "cascadingSamples/data/syslogs/*/*/*/" "cascadingSamples/Output-LogParser" "cascadingSamples/Output-LogParser/traps" "cascadingSamples/Output-LogParser/reports" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Output files | |
============= | |
$ hadoop fs -ls -R cascadingSamples |grep 'part*' | awk '{print $8}' | |
cascadingSamples/Output-LogParser/part-00000 | |
cascadingSamples/Output-LogParser/part-00001 | |
cascadingSamples/Output-LogParser/part-00002 | |
cascadingSamples/Output-LogParser/part-00003 | |
cascadingSamples/Output-LogParser/part-00004 | |
cascadingSamples/Output-LogParser/part-00005 | |
cascadingSamples/Output-LogParser/part-00006 | |
cascadingSamples/Output-LogParser/part-00007 | |
cascadingSamples/Output-LogParser/part-00008 | |
cascadingSamples/Output-LogParser/part-00009 | |
cascadingSamples/Output-LogParser/part-00010 | |
cascadingSamples/Output-LogParser/reports/part-00000 | |
cascadingSamples/Output-LogParser/traps/part-m-00001-00006 | |
cascadingSamples/Output-LogParser/traps/part-m-00002-00006 | |
Parsed log | |
=========== | |
$ hadoop fs -cat cascadingSamples/Output-LogParser/part-00003 | less | |
May 3 11:51:50 cdh-dn02 init tty (/dev/tty6) main process (1208) killed by TERM signal | |
May 3 11:52:26 cdh-dn02 kernel nf_conntrack version 0.5.0 (7972 buckets, 31888 max) | |
May 3 11:52:51 cdh-dn02 kernel hrtimer: interrupt took 6222750 ns | |
May 3 11:52:53 cdh-dn02 ntpd_initres host name not found: 0.rhel.pool.ntp.org | |
Report | |
=========== | |
$ hadoop fs -cat cascadingSamples/Output-LogParser/reports/part-00000 | less | |
console-kit-daemon 7 | |
gnome-session 11 | |
init 166 | |
kernel 810 | |
login 2 | |
networkmanager 7 | |
nm-dispatcher.action 4 | |
ntpd_initres 4133 | |
polkit-agent-helper-1 8 | |
pulseaudio 18 | |
spice-vdagent 15 | |
sshd 6 | |
sudo 8 | |
udevd 6 | |
Records that failed parsing | |
============================ | |
$ hadoop fs -cat cascadingSamples/Output-LogParser/traps/part* | |
May 7 00:40:53 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization. | |
May 7 00:42:13 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization. | |
May 7 00:43:38 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization. | |
May 7 00:45:01 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization. | |
May 7 00:47:18 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization. | |
May 7 00:47:41 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization. | |
Record count of input [syslogs] | |
=============================== | |
$ hadoop fs -cat cascadingSamples/data/syslogs/*/*/*/messages | wc -l | |
5207 | |
Record count of output [parsed logs + records that failed parsing] | |
=================================================================== | |
$ echo $((`hadoop fs -cat cascadingSamples/Output-LogParser/part* | wc -l`+`hadoop fs -cat cascadingSamples/Output-LogParser/traps/part* | wc -l`)) | |
5207 | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Code and data download | |
======================= | |
Github: | |
https://github.com/airawat/LogParser/tree/master/cascadingSamples | |
Directories relevant to this post: | |
=================================== | |
$ tree -if --noreport LogParser | |
LogParser | |
LogParser/cascadingSamples | |
LogParser/cascadingSamples/jars | |
LogParser/cascadingSamples/jars/logparser-cascading.jar | |
LogParser/cascadingSamples/src | |
LogParser/cascadingSamples/src/main | |
LogParser/cascadingSamples/src/main/java | |
LogParser/cascadingSamples/src/main/java/logparser | |
LogParser/cascadingSamples/src/main/java/logparser/LogParser.java | |
LogParser/data | |
LogParser/data/syslogs | |
LogParser/data/syslogs/cdh-dev01 | |
LogParser/data/syslogs/cdh-dev01/2013 | |
LogParser/data/syslogs/cdh-dev01/2013/04 | |
LogParser/data/syslogs/cdh-dev01/2013/04/messages | |
LogParser/data/syslogs/cdh-dev01/2013/05 | |
LogParser/data/syslogs/cdh-dev01/2013/05/messages | |
LogParser/data/syslogs/cdh-dn01 | |
LogParser/data/syslogs/cdh-dn01/2013 | |
LogParser/data/syslogs/cdh-dn01/2013/05 | |
LogParser/data/syslogs/cdh-dn01/2013/05/messages | |
LogParser/data/syslogs/cdh-dn02 | |
LogParser/data/syslogs/cdh-dn02/2013 | |
LogParser/data/syslogs/cdh-dn02/2013/04 | |
LogParser/data/syslogs/cdh-dn02/2013/04/messages | |
LogParser/data/syslogs/cdh-dn02/2013/05 | |
LogParser/data/syslogs/cdh-dn02/2013/05/messages | |
LogParser/data/syslogs/cdh-dn03 | |
LogParser/data/syslogs/cdh-dn03/2013 | |
LogParser/data/syslogs/cdh-dn03/2013/04 | |
LogParser/data/syslogs/cdh-dn03/2013/04/messages | |
LogParser/data/syslogs/cdh-dn03/2013/05 | |
LogParser/data/syslogs/cdh-dn03/2013/05/messages | |
LogParser/data/syslogs/cdh-jt01 | |
LogParser/data/syslogs/cdh-jt01/2013 | |
LogParser/data/syslogs/cdh-jt01/2013/04 | |
LogParser/data/syslogs/cdh-jt01/2013/04/messages | |
LogParser/data/syslogs/cdh-jt01/2013/05 | |
LogParser/data/syslogs/cdh-jt01/2013/05/messages | |
LogParser/data/syslogs/cdh-nn01 | |
LogParser/data/syslogs/cdh-nn01/2013 | |
LogParser/data/syslogs/cdh-nn01/2013/05 | |
LogParser/data/syslogs/cdh-nn01/2013/05/messages | |
LogParser/data/syslogs/cdh-vms | |
LogParser/data/syslogs/cdh-vms/2013 | |
LogParser/data/syslogs/cdh-vms/2013/05 | |
LogParser/data/syslogs/cdh-vms/2013/05/messages |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment