Created
February 13, 2019 14:34
-
-
Save okram999/c124e304adf946ba72700cedd6dcf653 to your computer and use it in GitHub Desktop.
A node.js lambda to push cw logs to ES and also perform pattern matching in logs with notifications
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// v1.1.2 | |
var https = require('https'); | |
var zlib = require('zlib'); | |
var crypto = require('crypto'); | |
//Adding the sns module to send email | |
var AWS = require('aws-sdk'); | |
var endpoint = 'xxxxxxxxxxx.us-east-1.es.amazonaws.com'; | |
exports.handler = function(input, context) { | |
// decode input from base64 | |
var zippedInput = new Buffer(input.awslogs.data, 'base64'); | |
// decompress the input | |
zlib.gunzip(zippedInput, function(error, buffer) { | |
if (error) { context.fail(error); return; } | |
// parse the input from JSON | |
var awslogsData = JSON.parse(buffer.toString('utf8')); | |
//Added to parse match here for filter pattern: | |
// if matched - send sns notification | |
var checkMatch = testMatchAndNotify(awslogsData, "Notify: [ Some Team ]") | |
// transform the input to Elasticsearch documents | |
var elasticsearchBulkData = transform(awslogsData); | |
// skip control messages | |
if (!elasticsearchBulkData) { | |
console.log('Received a control message'); | |
context.succeed('Control message handled successfully'); | |
return; | |
} | |
// post documents to the Amazon Elasticsearch Service | |
post(elasticsearchBulkData, function(error, success, statusCode, failedItems) { | |
console.log('Response: ' + JSON.stringify({ | |
"statusCode": statusCode | |
})); | |
if (error) { | |
console.log('Error: ' + JSON.stringify(error, null, 2)); | |
if (failedItems && failedItems.length > 0) { | |
console.log("Failed Items: " + | |
JSON.stringify(failedItems, null, 2)); | |
} | |
context.fail(JSON.stringify(error)); | |
} else { | |
console.log('Success: ' + JSON.stringify(success)); | |
context.succeed('Success'); | |
} | |
}); | |
}); | |
}; | |
// function to check for match and if found send notification | |
function testMatchAndNotify(payload, match_string){ | |
if (payload.messageType === 'CONTROL_MESSAGE') { | |
return null; | |
} | |
payload.logEvents.forEach(function(logEvent){ | |
if (logEvent.message.includes(match_string)) { | |
//trigger the sns | |
console.log("Sending sns") | |
AWS.config.update({region: 'us-east-1'}); | |
var params = { | |
Message: logEvent.message, /* required */ | |
TopicArn: 'arn:aws:sns:us-east-1:xxxxxxxxxxxx', | |
Subject: 'Detected ${match_string} in logs' | |
}; | |
var publishTextPromise = new AWS.SNS({apiVersion: '2010-03-31'}).publish(params).promise(); | |
publishTextPromise.then( | |
function(data) { | |
console.log("Message ${params.Message} send sent to the topic ${params.TopicArn}"); | |
console.log("MessageID is " + data.MessageId); | |
}).catch( | |
function(err) { | |
console.error(err, err.stack); | |
}); | |
} else { | |
// console.log("no match for the pattern") | |
console.log("Not Sending sns") | |
} | |
})} | |
function transform(payload) { | |
if (payload.messageType === 'CONTROL_MESSAGE') { | |
return null; | |
} | |
var bulkRequestBody = ''; | |
payload.logEvents.forEach(function(logEvent) { | |
var timestamp = new Date(1 * logEvent.timestamp); | |
// index name format: cwl-YYYY.MM.DD | |
var indexName = [ | |
'cwl-' + timestamp.getUTCFullYear(), // year | |
('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month | |
('0' + timestamp.getUTCDate()).slice(-2) // day | |
].join('.'); | |
var source = buildSource(logEvent.message, logEvent.extractedFields); | |
source['@id'] = logEvent.id; | |
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString(); | |
source['@message'] = logEvent.message; | |
source['@owner'] = payload.owner; | |
source['@log_group'] = payload.logGroup; | |
source['@log_stream'] = payload.logStream; | |
var action = { "index": {} }; | |
action.index._index = indexName; | |
action.index._type = payload.logGroup; | |
action.index._id = logEvent.id; | |
bulkRequestBody += [ | |
JSON.stringify(action), | |
JSON.stringify(source), | |
].join('\n') + '\n'; | |
}); | |
return bulkRequestBody; | |
} | |
function buildSource(message, extractedFields) { | |
if (extractedFields) { | |
var source = {}; | |
for (var key in extractedFields) { | |
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) { | |
var value = extractedFields[key]; | |
if (isNumeric(value)) { | |
source[key] = 1 * value; | |
continue; | |
} | |
jsonSubString = extractJson(value); | |
if (jsonSubString !== null) { | |
source['$' + key] = JSON.parse(jsonSubString); | |
} | |
source[key] = value; | |
} | |
} | |
return source; | |
} | |
jsonSubString = extractJson(message); | |
if (jsonSubString !== null) { | |
return JSON.parse(jsonSubString); | |
} | |
return {}; | |
} | |
function extractJson(message) { | |
var jsonStart = message.indexOf('{'); | |
if (jsonStart < 0) return null; | |
var jsonSubString = message.substring(jsonStart); | |
return isValidJson(jsonSubString) ? jsonSubString : null; | |
} | |
function isValidJson(message) { | |
try { | |
JSON.parse(message); | |
} catch (e) { return false; } | |
return true; | |
} | |
function isNumeric(n) { | |
return !isNaN(parseFloat(n)) && isFinite(n); | |
} | |
function post(body, callback) { | |
var requestParams = buildRequest(endpoint, body); | |
var request = https.request(requestParams, function(response) { | |
var responseBody = ''; | |
response.on('data', function(chunk) { | |
responseBody += chunk; | |
}); | |
response.on('end', function() { | |
var info = JSON.parse(responseBody); | |
var failedItems; | |
var success; | |
if (response.statusCode >= 200 && response.statusCode < 299) { | |
failedItems = info.items.filter(function(x) { | |
return x.index.status >= 300; | |
}); | |
success = { | |
"attemptedItems": info.items.length, | |
"successfulItems": info.items.length - failedItems.length, | |
"failedItems": failedItems.length | |
}; | |
} | |
var error = response.statusCode !== 200 || info.errors === true ? { | |
"statusCode": response.statusCode, | |
"responseBody": responseBody | |
} : null; | |
callback(error, success, response.statusCode, failedItems); | |
}); | |
}).on('error', function(e) { | |
callback(e); | |
}); | |
request.end(requestParams.body); | |
} | |
function buildRequest(endpoint, body) { | |
var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/); | |
var region = endpointParts[2]; | |
var service = endpointParts[3]; | |
var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, ''); | |
var date = datetime.substr(0, 8); | |
var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date); | |
var kRegion = hmac(kDate, region); | |
var kService = hmac(kRegion, service); | |
var kSigning = hmac(kService, 'aws4_request'); | |
var request = { | |
host: endpoint, | |
method: 'POST', | |
path: '/_bulk', | |
body: body, | |
headers: { | |
'Content-Type': 'application/json', | |
'Host': endpoint, | |
'Content-Length': Buffer.byteLength(body), | |
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN, | |
'X-Amz-Date': datetime | |
} | |
}; | |
var canonicalHeaders = Object.keys(request.headers) | |
.sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; }) | |
.map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; }) | |
.join('\n'); | |
var signedHeaders = Object.keys(request.headers) | |
.map(function(k) { return k.toLowerCase(); }) | |
.sort() | |
.join(';'); | |
var canonicalString = [ | |
request.method, | |
request.path, '', | |
canonicalHeaders, '', | |
signedHeaders, | |
hash(request.body, 'hex'), | |
].join('\n'); | |
var credentialString = [ date, region, service, 'aws4_request' ].join('/'); | |
var stringToSign = [ | |
'AWS4-HMAC-SHA256', | |
datetime, | |
credentialString, | |
hash(canonicalString, 'hex') | |
] .join('\n'); | |
request.headers.Authorization = [ | |
'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString, | |
'SignedHeaders=' + signedHeaders, | |
'Signature=' + hmac(kSigning, stringToSign, 'hex') | |
].join(', '); | |
return request; | |
} | |
function hmac(key, str, encoding) { | |
return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding); | |
} | |
function hash(str, encoding) { | |
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment