Last active
July 5, 2019 13:02
-
-
Save theburningmonk/34df5521e6ba5eb82f23f74bc6dc3a66 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const co = require('co'); | |
const log = require('./log'); | |
const reqContext = require('./requestContext'); | |
function getEvents (records) { | |
// parse the Kinesis records (base64) as JSON | |
} | |
function setRequestContext (record, context) { | |
let ctx = record.__context; | |
delete record.__context; | |
ctx.awsRequestId = context.awsRequestId; | |
reqContext.replaceAllWith(ctx); | |
}; | |
function createKinesisHandler (f) { | |
return co.wrap(function* (event, context, cb) { | |
console.log(JSON.stringify(event)); | |
try { | |
let records = getEvents(event.Records); | |
// the problem Kinesis events present is that we have to include the | |
// correlation IDs as part of the payload of a record, but we receive | |
// multiple records on an invocation - so to apply the correct set of | |
// correlation IDs in the logs we have to force the handling code to | |
// process them one at time so that we can swap out the correlation IDs | |
for (let record of records) { | |
reqContext.clearAll(); | |
try { | |
setRequestContext(record, context); | |
} catch (err) { | |
log.warn(`couldn't set current request context: ${err}`, err.stack); | |
} | |
yield Promise.resolve(f(record, context)); | |
} | |
log.info('SUCCESS'); | |
cb(null, 'SUCCESS'); | |
} catch (err) { | |
log.error("Failed to process request", err); | |
cb(err); | |
} | |
}); | |
} | |
module.exports = createKinesisHandler; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment