Created
December 19, 2018 09:08
-
-
Save letrunghieu/f2cf5d2324a68a8ec8c209619467185b to your computer and use it in GitHub Desktop.
Lambda transformation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
console.log('Loading function'); | |
let patterns = require('node-grok').loadDefaultSync(); | |
let geoip = require('geoip-lite'); | |
let device = require('device'); | |
let countries = require('countryjs'); | |
let moment = require('moment'); | |
let grokPattern = "%{HTTPDATE:timestamp} %{IP:clientIP} %{NOTSPACE:username} %{WORD:method} %{NOTSPACE:uri} %{IP:serverIP} %{NUMBER:port} %{NOTSPACE:host} %{QUOTEDSTRING:userAgent} %{NOTSPACE:referer} %{NUMBER:status} %{NUMBER:bytesSent} %{NUMBER:requestTime} %{QUOTEDSTRING:forwardedFor} (?<forwardedPort>-|%{NUMBER}) (?<forwardedProtocol>-|%{NOTSPACE})"; | |
let pattern = patterns.createPattern(grokPattern); | |
/** | |
* @see http://docs.aws.amazon.com/elasticloadbalancing/latest/classic/x-forwarded-headers.html#x-forwarded-for | |
* @param strVal | |
* @return string|null | |
*/ | |
const getForwardedIp = (strVal) => { | |
let ips = strVal.split(",").map((x) => x.trim()); | |
let originalIP = ips[ips.length - 1]; | |
if (originalIP === "-") { | |
return null; | |
} else { | |
return originalIP; | |
} | |
}; | |
let transform = (event, context, callback) => { | |
let success = 0; // Number of valid entries found | |
let failure = 0; // Number of invalid entries found | |
/* Process the list of records and transform them */ | |
const output = event.records.map((record) => { | |
const entry = (Buffer.from(record.data, 'base64')).toString('utf8'); | |
const match = pattern.parseSync(entry); | |
let result = { | |
message: entry, | |
tags: [] | |
}; | |
if (match) { | |
/* Prepare JSON version from Apache log data */ | |
result = Object.assign(result, match); | |
result.tags.push('grok_parsed_success'); | |
// trim the double quotes from the user agent, referer and forwarded for | |
result.userAgent = result.userAgent.substr(1, result.userAgent.length - 2); | |
result.referer = result.referer.substr(1, result.referer.length - 2); | |
result.forwardedFor = result.forwardedFor.substr(1, result.forwardedFor.length - 2); | |
// convert strings to numbers | |
result.bytesSent = +result.bytesSent; | |
result.requestTime = +result.requestTime; | |
// get the original IP | |
let forwardedIp = getForwardedIp(result.forwardedFor); | |
if (!forwardedIp) { | |
result.originalIP = result.clientIP; | |
} else { | |
result.originalIP = forwardedIp; | |
} | |
// geoip lookup | |
let resolvedIp = geoip.lookup(result.originalIP); | |
result.geoip = resolvedIp; | |
if (resolvedIp) { | |
result.tags.push('geoip_looked_up_success'); | |
result.geoip.countryName = countries.name(result.geoip.country); | |
// the GeoJSON format has the longitude before the latitude in the array, we must change | |
// geoip.ll into geoip.location object | |
result.geoip.location = { | |
lat: result.geoip.ll[0], | |
lon: result.geoip.ll[1] | |
}; | |
delete result.geoip.ll; | |
} else { | |
result.tags.push('geoip_looked_up_fail'); | |
} | |
// useragent parser | |
result.device = device(result.userAgent, {parseUserAgent: true}); | |
result.browser = result.device.parser.useragent; | |
delete result.device.parser; | |
// add @timestamp field | |
result["@timestamp"] = moment(result.timestamp, 'DD/MMM/YYYY:HH:mm:ss Z', true).toISOString(); | |
delete result.timestamp; | |
const payload = (Buffer.from(JSON.stringify(result), 'utf8')).toString('base64'); | |
success++; | |
return { | |
recordId: record.recordId, | |
result: 'Ok', | |
data: payload, | |
}; | |
} else { | |
result.tags.push('grok_parsed_fail'); | |
/* Failed event, notify the error and leave the record intact */ | |
failure++; | |
return { | |
recordId: record.recordId, | |
result: 'ProcessingFailed', | |
data: record.data, | |
}; | |
} | |
}); | |
console.log(`Processing completed. Successful records ${success}, Failed records ${failure}.`); | |
callback(null, {records: output}); | |
}; | |
exports.handler = transform; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "nginx-access-log-transformer", | |
"version": "1.0.0", | |
"description": "Kinesis Firehose transformer for Nginx access logs", | |
"main": "index.js", | |
"scripts": { | |
"test": "echo \"Error: no test specified\" && exit 1" | |
}, | |
"repository": { | |
"type": "git", | |
"url": "ssh://[email protected]:7999/res/nginx-access-log-transformer.git" | |
}, | |
"author": "Hieu Le <[email protected]>", | |
"license": "UNLICENSED", | |
"private": true, | |
"dependencies": { | |
"countryjs": "^1.8.0", | |
"device": "^0.3.8", | |
"geoip-lite": "^1.2.1", | |
"moment": "^2.18.1", | |
"node-grok": "^2.0.3", | |
"useragent": "^2.1.13" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment