Last active
January 6, 2023 18:38
-
-
Save hboylan/68ad1bea3e603b33e338c39bbd8c72d3 to your computer and use it in GitHub Desktop.
AWS Lambda function to read and write S3 files by line to perform efficient processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream') | |
const readline = require('readline') | |
const AWS = require('aws-sdk') | |
const S3 = new AWS.S3() | |
// read S3 file by line | |
function createReadline(Bucket, Key) { | |
// s3 read stream | |
const input = S3 | |
.getObject({ | |
Bucket, | |
Key | |
}) | |
.createReadStream() | |
// node readline with stream | |
return readline | |
.createInterface({ | |
input, | |
terminal: false | |
}) | |
} | |
// write S3 file | |
function createWriteStream(Bucket, Key) { | |
const writeStream = new stream.PassThrough() | |
const uploadPromise = S3 | |
.upload({ | |
Bucket, | |
Key, | |
Body: writeStream | |
}) | |
.promise() | |
return { writeStream, uploadPromise } | |
} | |
// perform processing on line | |
function processLine(line) { | |
// do something | |
return line | |
} | |
// event.inputBucket: source file bucket | |
// event.inputKey: source file key | |
// event.outputBucket: target file bucket | |
// event.outputKey: target file key | |
// event.limit: maximum number of lines to read | |
exports.handler = function execute(event, context, callback) { | |
console.log(JSON.stringify(event, null, 2)) | |
var totalLineCount = 0 | |
// create input stream from S3 | |
const readStream = createReadline(event.inputBucket, event.inputKey) | |
// create output stream to S3 | |
const { writeStream, uploadPromise } = createWriteStream(event.outputBucket, event.outputKey) | |
// read each line | |
readStream.on('line', line => { | |
// close stream on limit | |
if (event.limit && event.limit <= totalLineCount) { | |
return readStream.close() | |
} | |
// process line | |
else { | |
line = processLine(line) | |
writeStream.write(`${line}\n`) | |
} | |
totalLineCount++ | |
}) | |
// clean up on close | |
readStream.on('close', async () => { | |
// end write stream | |
writeStream.end() | |
// wait for upload | |
const uploadResponse = await uploadPromise | |
// return processing insights | |
callback(null, { | |
totalLineCount, | |
uploadResponse | |
}) | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Will this work when the lambda function's timeout is very less?