Skip to content

Instantly share code, notes, and snippets.

@theburningmonk
Last active July 5, 2019 13:02
Show Gist options
  • Save theburningmonk/34df5521e6ba5eb82f23f74bc6dc3a66 to your computer and use it in GitHub Desktop.
Save theburningmonk/34df5521e6ba5eb82f23f74bc6dc3a66 to your computer and use it in GitHub Desktop.
'use strict';
const co = require('co');
const log = require('./log');
const reqContext = require('./requestContext');
function getEvents (records) {
// parse the Kinesis records (base64) as JSON
}
function setRequestContext (record, context) {
let ctx = record.__context;
delete record.__context;
ctx.awsRequestId = context.awsRequestId;
reqContext.replaceAllWith(ctx);
};
function createKinesisHandler (f) {
return co.wrap(function* (event, context, cb) {
console.log(JSON.stringify(event));
try {
let records = getEvents(event.Records);
// the problem Kinesis events present is that we have to include the
// correlation IDs as part of the payload of a record, but we receive
// multiple records on an invocation - so to apply the correct set of
// correlation IDs in the logs we have to force the handling code to
// process them one at time so that we can swap out the correlation IDs
for (let record of records) {
reqContext.clearAll();
try {
setRequestContext(record, context);
} catch (err) {
log.warn(`couldn't set current request context: ${err}`, err.stack);
}
yield Promise.resolve(f(record, context));
}
log.info('SUCCESS');
cb(null, 'SUCCESS');
} catch (err) {
log.error("Failed to process request", err);
cb(err);
}
});
}
module.exports = createKinesisHandler;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment