Created
October 13, 2017 00:34
-
-
Save adamjarret/07c5c974755e1dd640cf481d89f0959f to your computer and use it in GitHub Desktop.
AWS Lambda for shipping logs from S3 to CloudWatch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const aws = require('aws-sdk'); | |
const s3 = new aws.S3({apiVersion: '2006-03-01'}); | |
const cw = new aws.CloudWatchLogs({apiVersion: '2015-01-28'}); | |
const AlreadyExists = 'ResourceAlreadyExistsException'; | |
const months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']; | |
// Thanks https://github.com/caolan/async/blob/master/lib/waterfall.js | |
function cascade(tasks, callback, ignoreError) | |
{ | |
callback = callback || (() => undefined); | |
ignoreError = ignoreError || (() => false); | |
if (!tasks || !tasks.length) { return callback(); } | |
let taskIndex = 0; | |
const nextTask = (args) => tasks[taskIndex++].apply(null, [next, ...args]); | |
const next = (err, ...args) => { | |
const shouldIgnoreError = (err && ignoreError(err, taskIndex)); | |
return ((err && !shouldIgnoreError) || taskIndex === tasks.length) ? ( | |
callback.apply(null, [(shouldIgnoreError ? null : err), ...args]) | |
) : ( | |
nextTask(args) | |
); | |
}; | |
nextTask([]); | |
} | |
// Thanks http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html (see: Time) | |
// Thanks http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#BasicDistributionFileFormat | |
function parseTime(dateString) | |
{ | |
let dg = null; | |
// Amazon "Server Access Log" Format Time pattern: | |
// strftime [%d/%b/%Y:%H:%M:%S %z] (ex. [06/Feb/2014:00:00:38 +0000]) | |
dg = dateString.match(/(\d{2})\/([A-Z][a-z]{2})\/(\d{4}):(\d{2}):(\d{2}):(\d{2})/); | |
if(dg) { | |
return Date.UTC(dg[3], months.indexOf(dg[2]), dg[1], dg[4], dg[5], dg[6]); | |
} | |
// ISO 8601 (ex. 2014-05-23T01:13:11Z) or | |
// Amazon "CloudFront Access Log" Format Time pattern: | |
// strftime %Y-%m-%d%t%H:%M:%S (ex. 2014-05-23 01:13:11) | |
dg = dateString.match(/(\d{4})-(\d{2})-(\d{2})[T\s](\d{2}):(\d{2}):(\d{2})/); | |
if(dg) { | |
return Date.UTC(dg[1], parseInt(dg[2], 10) - 1, dg[3], dg[4], dg[5], dg[6]); | |
} | |
return Date.now(); | |
} | |
// Thanks https://github.com/rajatkumar/cloudwatchlogger/blob/master/lib/CloudWatchLogsStream.js | |
exports.handler = (event, context, callback) => { | |
let sequenceToken = null; | |
const eventRecord = event.Records[0]; // only ever one record | |
const {bucket: {name: Bucket}, object} = eventRecord.s3; | |
const Key = decodeURIComponent(object.key.replace(/\+/g, ' ')); | |
const pathComponents = Key.split('/'); | |
const logGroupName = pathComponents.splice(0, 1)[0]; // first path component | |
const logId = pathComponents.join('/'); // all but the first path component (joined with '/') | |
// If logId contains a date in the YYYY-mm-dd format, use that as the stream name | |
const deliveryDate = logId.match(/\d{4}-\d{2}-\d{2}/); | |
const logStreamName = deliveryDate ? deliveryDate[0] : logId; | |
function getObject(cb) | |
{ | |
s3.getObject({Bucket, Key}, cb); | |
} | |
function deleteObject(cb) | |
{ | |
s3.deleteObject({Bucket, Key}, cb); | |
} | |
function parseLogEvents(cb, getObjectResponse) | |
{ | |
cb(null, getObjectResponse.Body.toString().trim().split('\n').reduce((a, line) => { | |
// Ignore lines that begin with # (to accommodate CloudFront format) | |
if(!line.startsWith('#')) { | |
// Append log file source to the message body | |
a.push({message: line.trim() + (` s3://${Bucket}/${Key}`), timestamp: parseTime(line)}); | |
console.log('Parsed: ', a[a.length - 1]); | |
} | |
else { | |
console.log('Ignored: ', line); | |
} | |
return a; | |
}, [])); | |
} | |
function putLogEvents(cb, logEvents) | |
{ | |
if(!logEvents.length) { return cb(new Error('Nothing to log')); } | |
cw.putLogEvents({logStreamName, logGroupName, logEvents, sequenceToken}, cb); | |
} | |
function createStream(cb) | |
{ | |
cw.createLogStream({logGroupName, logStreamName}, cb); | |
} | |
function createGroup(cb) | |
{ | |
cw.createLogGroup({logGroupName}, cb); | |
} | |
function describeStream(cb) | |
{ | |
cw.describeLogStreams({logGroupName, logStreamNamePrefix: logStreamName, limit: 1}, cb); | |
} | |
function setSequenceToken(cb, data) | |
{ | |
// If sequenceToken is missing, the stream may have just been created | |
const valid = (data && data.logStreams && data.logStreams.length); | |
sequenceToken = !valid ? null : data.logStreams[0].uploadSequenceToken; | |
cb(null); | |
} | |
function createLogger(cb) | |
{ | |
// Ignore AlreadyExists exceptions (they just mean there is nothing to do before continuing) | |
cascade([createGroup, createStream], cb, (err) => (err.code === AlreadyExists)); | |
} | |
cascade([ | |
createLogger, | |
describeStream, | |
setSequenceToken, | |
getObject, | |
parseLogEvents, | |
putLogEvents, | |
deleteObject | |
], callback); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Adam! Im trying to follow this in our environment but how do i uncompress the files? Basically our AWS Application Load Balancer forwards the access logs to S3 bucket and when I created this function, it sends the gz files and doesnt unarchive the gz file from s3 in the Cloudwatch logs.
Screenshot from Cloudwatch logs: https://prnt.sc/11wsm1a
Thanks!