Created
January 4, 2016 19:11
-
-
Save jamiequint/a79d5b0672fe8231096d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var https = require('https'); | |
var zlib = require('zlib'); | |
var crypto = require('crypto'); | |
var endpoint = 'YOUR_ENDPOINT'; | |
exports.handler = function(input, context) { | |
var records = input.Records; | |
records.forEach(function(record) { | |
var elasticsearchData = transform(record); | |
if (elasticsearchData) { | |
// post documents to the Amazon Elasticsearch Service | |
post(elasticsearchData, function(error, success, statusCode, failedItems) { | |
console.log('Response: ' + JSON.stringify({ | |
"statusCode": statusCode | |
})); | |
if (error) { | |
console.log('Error: ' + JSON.stringify(error, null, 2)); | |
if (failedItems && failedItems.length > 0) { | |
console.log("Failed Items: " + | |
JSON.stringify(failedItems, null, 2)); | |
} | |
context.fail(JSON.stringify(error)); | |
} else { | |
console.log('Success: ' + JSON.stringify(success)); | |
context.succeed('Success'); | |
} | |
}); | |
} else { | |
console.log('Failed to log record:' + record); | |
context.succeed('Success'); | |
} | |
}); | |
}; | |
function transform(record) { | |
// parse the input from JSON | |
var message = new Buffer(record.kinesis.data, 'base64').toString('ascii'); | |
var requestBody = ''; | |
// index name format: cwl-YYYY.MM.DD | |
var re = /(\d+-\d+-\d+\s\d+:\d+:\d+)\s(\S+)\s(\S+)\/(\w+):(\S+)\s:\s+(.*)/; | |
var match = re.exec(message); | |
var source = {}; | |
if (!(match && match.length > 6)) { | |
return null; | |
} | |
var timestamp = new Date(match[1]); | |
var indexName = [ | |
'cwl-' + timestamp.getUTCFullYear(), // year | |
('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month | |
('0' + timestamp.getUTCDate()).slice(-2) // day | |
].join('.'); | |
var indexType = 'interstate-test'; | |
var indexId = record.eventID; | |
source['@id'] = indexId; | |
source['@machine'] = match[2]; | |
source['@log_group'] = match[3]; | |
source['@process'] = match[4]; | |
source['@build'] = match[5]; | |
source['@message'] = match[6]; | |
source['@timestamp'] = timestamp.toISOString(); | |
var action = { "index": {} }; | |
action.index._index = indexName; | |
action.index._type = indexType; | |
action.index._id = indexId; | |
requestBody += [ | |
JSON.stringify(action), | |
JSON.stringify(source), | |
].join('\n') + '\n'; | |
return requestBody; | |
} | |
function post(body, callback) { | |
var requestParams = buildRequest(endpoint, body); | |
var request = https.request(requestParams, function(response) { | |
var responseBody = ''; | |
response.on('data', function(chunk) { | |
responseBody += chunk; | |
}); | |
response.on('end', function() { | |
var info = JSON.parse(responseBody); | |
var failedItems; | |
var success; | |
if (response.statusCode >= 200 && response.statusCode < 299) { | |
failedItems = info.items.filter(function(x) { | |
return x.index.status >= 300; | |
}); | |
success = { | |
"attemptedItems": info.items.length, | |
"successfulItems": info.items.length - failedItems.length, | |
"failedItems": failedItems.length | |
}; | |
} | |
var error = response.statusCode !== 200 || info.errors === true ? { | |
"statusCode": response.statusCode, | |
"responseBody": responseBody | |
} : null; | |
callback(error, success, response.statusCode, failedItems); | |
}); | |
}).on('error', function(e) { | |
callback(e); | |
}); | |
request.end(requestParams.body); | |
} | |
function buildRequest(endpoint, body) { | |
var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/); | |
var region = endpointParts[2]; | |
var service = endpointParts[3]; | |
var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, ''); | |
var date = datetime.substr(0, 8); | |
var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date); | |
var kRegion = hmac(kDate, region); | |
var kService = hmac(kRegion, service); | |
var kSigning = hmac(kService, 'aws4_request'); | |
var request = { | |
host: endpoint, | |
method: 'POST', | |
path: '/_bulk', | |
body: body, | |
headers: { | |
'Content-Type': 'application/json', | |
'Host': endpoint, | |
'Content-Length': Buffer.byteLength(body), | |
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN, | |
'X-Amz-Date': datetime | |
} | |
}; | |
var canonicalHeaders = Object.keys(request.headers) | |
.sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; }) | |
.map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; }) | |
.join('\n'); | |
var signedHeaders = Object.keys(request.headers) | |
.map(function(k) { return k.toLowerCase(); }) | |
.sort() | |
.join(';'); | |
var canonicalString = [ | |
request.method, | |
request.path, '', | |
canonicalHeaders, '', | |
signedHeaders, | |
hash(request.body, 'hex'), | |
].join('\n'); | |
var credentialString = [ date, region, service, 'aws4_request' ].join('/'); | |
var stringToSign = [ | |
'AWS4-HMAC-SHA256', | |
datetime, | |
credentialString, | |
hash(canonicalString, 'hex') | |
] .join('\n'); | |
request.headers.Authorization = [ | |
'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString, | |
'SignedHeaders=' + signedHeaders, | |
'Signature=' + hmac(kSigning, stringToSign, 'hex') | |
].join(', '); | |
return request; | |
} | |
function hmac(key, str, encoding) { | |
return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding); | |
} | |
function hash(str, encoding) { | |
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment