Last active
December 30, 2017 15:35
-
-
Save vrivellino/64ff8b1b01f82ec8935d to your computer and use it in GitHub Desktop.
Lambda: archive kinesis stream to S3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
console.log('Loading function'); | |
var AWS = require('aws-sdk'), | |
s3 = new AWS.S3(), | |
s3Bucket = 'archive-bucket', | |
s3Prefix = 'kinesis-archive-test', | |
s3Partitions = 2; | |
exports.handler = function (event, context) { | |
//console.log(JSON.stringify(event, null, 2)); | |
var records = event.Records.length, | |
recordsProcessed = 0, | |
lastError = null, | |
successObj = {message: 'Archival successful'}; | |
/* Only call context.done if we're done processing */ | |
function handlerFinish() { | |
if (recordsProcessed === records) { | |
context.done(lastError, successObj); | |
} | |
} | |
/* loop through kinesis records (they could be batched) */ | |
event.Records.forEach(function (record) { | |
console.log(JSON.stringify(record, null, 2)); | |
// Kinesis data is base64 encoded so decode here | |
var eventPayload = new Buffer(record.kinesis.data, 'base64').toString('ascii'), | |
keyName = s3Prefix + '/' + record.eventID.slice(-5) % s3Partitions + '/'+ record.eventID, | |
s3_params = {Bucket: s3Bucket, Key: keyName, Body: eventPayload}; | |
console.log('Attempt to upload data to ' + s3Bucket +'/' + keyName + ': ' + eventPayload); | |
s3.putObject(s3_params, function (err, data) { | |
if (err) { | |
recordsProcessed++; | |
console.log(err, err.stack); | |
lastError = err; | |
successObj = null; | |
handlerFinish(); | |
} else { | |
recordsProcessed++; | |
console.log('Successfully uploaded data to ' + s3Bucket + '/' + keyName + ': ' + data); | |
handlerFinish(); | |
} | |
}); | |
}); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment