Created
February 14, 2017 12:29
-
-
Save kesor/c20f84cf605db24ca0b51612d1ab1367 to your computer and use it in GitHub Desktop.
Lambda function to send events from AWS CloudWatch Logs to ElasticSearch.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var http = require('http'); | |
var zlib = require('zlib'); | |
var ts = [ 0 ]; | |
var DEBUG = false; // set to `true` to enable timestamps in results | |
// private IP of the ELK server in "Default VPC" | |
var endpointHost = '172.31.10.250'; | |
var endpointPort = 9200; | |
var request_timeout = 800; // milliseconds | |
function debug(state) { | |
if (DEBUG) { ts.push(state, Date.now() - ts[0]); } | |
} | |
exports.handler = function(input, context) { | |
ts = [ Date.now() ]; | |
// decode input from base64 | |
var zippedInput = new Buffer(input.awslogs.data, 'base64'); | |
// decompress the input | |
zlib.gunzip(zippedInput, function(error, buffer) { | |
if (error) { context.fail(error); return; } | |
// parse the input from JSON | |
var awslogsData = JSON.parse(buffer.toString('utf8')); | |
debug("parsed gzip"); | |
// transform the input to Elasticsearch documents | |
var elasticsearchBulkData = transform(awslogsData); | |
debug("transformed json"); | |
// skip control messages | |
if (!elasticsearchBulkData) { | |
context.succeed('Control message handled successfully '+JSON.stringify(ts)); | |
return; | |
} | |
// post documents to the Amazon Elasticsearch Service | |
var requestParams = { | |
host: endpointHost, | |
port: endpointPort, | |
method: 'POST', | |
path: '/_bulk', | |
body: elasticsearchBulkData, | |
headers: { | |
'Content-Type': 'application/json', | |
'Host': endpointHost + ':' + endpointPort, | |
'Content-Length': Buffer.byteLength(elasticsearchBulkData) | |
} | |
}; | |
debug("sending request"); | |
var request = http.request(requestParams, function(response) { | |
debug("started response"); | |
var responseBody = ''; | |
response.on('data', function(chunk) { | |
responseBody += chunk; | |
}); | |
response.on('end', function() { | |
debug("finished response"); | |
var info = JSON.parse(responseBody); | |
var failedItems; | |
if (response.statusCode >= 200 && response.statusCode < 299) { | |
failedItems = info.items.filter(function(x) { | |
return x.index.status >= 300; | |
}); | |
} | |
var error = response.statusCode !== 200 || info.errors === true ? { | |
"statusCode": response.statusCode, | |
"responseBody": responseBody | |
} : null; | |
if (error) { | |
console.log('Error: ' + JSON.stringify(error, null, 2)); | |
if (failedItems && failedItems.length > 0) { | |
console.log("Failed Items: " + JSON.stringify(failedItems, null, 2)); | |
} | |
context.fail(JSON.stringify([ error, ts ])); | |
} else { | |
context.succeed('Success '+JSON.stringify(ts)); | |
} | |
}); | |
}).on('error', function(e) { | |
context.fail(e); | |
}); | |
request.on('socket', function(socket) { | |
socket.setTimeout(request_timeout, function() { | |
request.abort(); | |
}); | |
debug("socket established"); | |
}); | |
request.end(requestParams.body); | |
}); | |
}; | |
function transform(payload) { | |
if (payload.messageType === 'CONTROL_MESSAGE') { | |
return null; | |
} | |
var bulkRequestBody = ''; | |
payload.logEvents.forEach(function(logEvent) { | |
var timestamp = new Date(1 * logEvent.timestamp); | |
// index name format: cwl-YYYY.MM.DD.HH | |
var indexName = [ | |
'cwl-' + timestamp.getUTCFullYear(), // year | |
('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month | |
('0' + timestamp.getUTCDate()).slice(-2), // day | |
('0' + timestamp.getUTCHours()).slice(-2) // hour | |
].join('.'); | |
var source = {}; | |
source['@timestamp'] = timestamp.toISOString(); | |
source.message = logEvent.message; | |
source.gateway = payload.logGroup.replace(/^[^_]+_/, ''); | |
var action = { "index": {} }; | |
action.index._index = indexName; | |
action.index._type = 'cwl'; | |
action.index._id = logEvent.id; | |
bulkRequestBody += [ | |
JSON.stringify(action), | |
JSON.stringify(source) | |
].join('\n') + '\n'; | |
}); | |
return bulkRequestBody; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment