Skip to content

Instantly share code, notes, and snippets.

@jamiequint
Created January 4, 2016 19:11
Show Gist options
  • Save jamiequint/a79d5b0672fe8231096d to your computer and use it in GitHub Desktop.
Save jamiequint/a79d5b0672fe8231096d to your computer and use it in GitHub Desktop.
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');
var endpoint = 'YOUR_ENDPOINT';
exports.handler = function(input, context) {
var records = input.Records;
records.forEach(function(record) {
var elasticsearchData = transform(record);
if (elasticsearchData) {
// post documents to the Amazon Elasticsearch Service
post(elasticsearchData, function(error, success, statusCode, failedItems) {
console.log('Response: ' + JSON.stringify({
"statusCode": statusCode
}));
if (error) {
console.log('Error: ' + JSON.stringify(error, null, 2));
if (failedItems && failedItems.length > 0) {
console.log("Failed Items: " +
JSON.stringify(failedItems, null, 2));
}
context.fail(JSON.stringify(error));
} else {
console.log('Success: ' + JSON.stringify(success));
context.succeed('Success');
}
});
} else {
console.log('Failed to log record:' + record);
context.succeed('Success');
}
});
};
function transform(record) {
// parse the input from JSON
var message = new Buffer(record.kinesis.data, 'base64').toString('ascii');
var requestBody = '';
// index name format: cwl-YYYY.MM.DD
var re = /(\d+-\d+-\d+\s\d+:\d+:\d+)\s(\S+)\s(\S+)\/(\w+):(\S+)\s:\s+(.*)/;
var match = re.exec(message);
var source = {};
if (!(match && match.length > 6)) {
return null;
}
var timestamp = new Date(match[1]);
var indexName = [
'cwl-' + timestamp.getUTCFullYear(), // year
('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month
('0' + timestamp.getUTCDate()).slice(-2) // day
].join('.');
var indexType = 'interstate-test';
var indexId = record.eventID;
source['@id'] = indexId;
source['@machine'] = match[2];
source['@log_group'] = match[3];
source['@process'] = match[4];
source['@build'] = match[5];
source['@message'] = match[6];
source['@timestamp'] = timestamp.toISOString();
var action = { "index": {} };
action.index._index = indexName;
action.index._type = indexType;
action.index._id = indexId;
requestBody += [
JSON.stringify(action),
JSON.stringify(source),
].join('\n') + '\n';
return requestBody;
}
function post(body, callback) {
var requestParams = buildRequest(endpoint, body);
var request = https.request(requestParams, function(response) {
var responseBody = '';
response.on('data', function(chunk) {
responseBody += chunk;
});
response.on('end', function() {
var info = JSON.parse(responseBody);
var failedItems;
var success;
if (response.statusCode >= 200 && response.statusCode < 299) {
failedItems = info.items.filter(function(x) {
return x.index.status >= 300;
});
success = {
"attemptedItems": info.items.length,
"successfulItems": info.items.length - failedItems.length,
"failedItems": failedItems.length
};
}
var error = response.statusCode !== 200 || info.errors === true ? {
"statusCode": response.statusCode,
"responseBody": responseBody
} : null;
callback(error, success, response.statusCode, failedItems);
});
}).on('error', function(e) {
callback(e);
});
request.end(requestParams.body);
}
function buildRequest(endpoint, body) {
var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
var region = endpointParts[2];
var service = endpointParts[3];
var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
var date = datetime.substr(0, 8);
var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
var kRegion = hmac(kDate, region);
var kService = hmac(kRegion, service);
var kSigning = hmac(kService, 'aws4_request');
var request = {
host: endpoint,
method: 'POST',
path: '/_bulk',
body: body,
headers: {
'Content-Type': 'application/json',
'Host': endpoint,
'Content-Length': Buffer.byteLength(body),
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
'X-Amz-Date': datetime
}
};
var canonicalHeaders = Object.keys(request.headers)
.sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
.map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
.join('\n');
var signedHeaders = Object.keys(request.headers)
.map(function(k) { return k.toLowerCase(); })
.sort()
.join(';');
var canonicalString = [
request.method,
request.path, '',
canonicalHeaders, '',
signedHeaders,
hash(request.body, 'hex'),
].join('\n');
var credentialString = [ date, region, service, 'aws4_request' ].join('/');
var stringToSign = [
'AWS4-HMAC-SHA256',
datetime,
credentialString,
hash(canonicalString, 'hex')
] .join('\n');
request.headers.Authorization = [
'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
'SignedHeaders=' + signedHeaders,
'Signature=' + hmac(kSigning, stringToSign, 'hex')
].join(', ');
return request;
}
function hmac(key, str, encoding) {
return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}
function hash(str, encoding) {
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment