Skip to content

Instantly share code, notes, and snippets.

@vrivellino
Last active December 30, 2017 15:35
Show Gist options
  • Save vrivellino/64ff8b1b01f82ec8935d to your computer and use it in GitHub Desktop.
Save vrivellino/64ff8b1b01f82ec8935d to your computer and use it in GitHub Desktop.
Lambda: archive kinesis stream to S3
console.log('Loading function');
var AWS = require('aws-sdk'),
s3 = new AWS.S3(),
s3Bucket = 'archive-bucket',
s3Prefix = 'kinesis-archive-test',
s3Partitions = 2;
exports.handler = function (event, context) {
//console.log(JSON.stringify(event, null, 2));
var records = event.Records.length,
recordsProcessed = 0,
lastError = null,
successObj = {message: 'Archival successful'};
/* Only call context.done if we're done processing */
function handlerFinish() {
if (recordsProcessed === records) {
context.done(lastError, successObj);
}
}
/* loop through kinesis records (they could be batched) */
event.Records.forEach(function (record) {
console.log(JSON.stringify(record, null, 2));
// Kinesis data is base64 encoded so decode here
var eventPayload = new Buffer(record.kinesis.data, 'base64').toString('ascii'),
keyName = s3Prefix + '/' + record.eventID.slice(-5) % s3Partitions + '/'+ record.eventID,
s3_params = {Bucket: s3Bucket, Key: keyName, Body: eventPayload};
console.log('Attempt to upload data to ' + s3Bucket +'/' + keyName + ': ' + eventPayload);
s3.putObject(s3_params, function (err, data) {
if (err) {
recordsProcessed++;
console.log(err, err.stack);
lastError = err;
successObj = null;
handlerFinish();
} else {
recordsProcessed++;
console.log('Successfully uploaded data to ' + s3Bucket + '/' + keyName + ': ' + data);
handlerFinish();
}
});
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment