-
-
Save amiantos/16bacc9ed742c91151fcf1a41012445e to your computer and use it in GitHub Desktop.
// Lambda S3 Zipper | |
// http://amiantos.net/zip-multiple-files-on-aws-s3/ | |
// | |
// Accepts a bundle of data in the format... | |
// { | |
// "bucket": "your-bucket", | |
// "destination_key": "zips/test.zip", | |
// "files": [ | |
// { | |
// "uri": "...", (options: S3 file key or URL) | |
// "filename": "...", (filename of file inside zip) | |
// "type": "..." (options: [file, url]) | |
// } | |
// ] | |
// } | |
// Saves zip file at "destination_key" location | |
"use strict"; | |
const AWS = require("aws-sdk"); | |
const awsOptions = { | |
region: "us-east-1", | |
httpOptions: { | |
timeout: 300000 // Matching Lambda function timeout | |
} | |
}; | |
const s3 = new AWS.S3(awsOptions); | |
const archiver = require("archiver"); | |
const stream = require("stream"); | |
const request = require("request"); | |
const streamTo = (bucket, key, resolve) => { | |
var passthrough = new stream.PassThrough(); | |
s3.upload( | |
{ | |
Bucket: bucket, | |
Key: key, | |
Body: passthrough, | |
ContentType: "application/zip", | |
ServerSideEncryption: "AES256" | |
}, | |
(err, data) => { | |
if (err) throw err; | |
console.log("Zip uploaded"); | |
resolve(); | |
} | |
).on("httpUploadProgress", (progress) => { | |
console.log(progress); | |
}); | |
return passthrough; | |
}; | |
// Kudos to this person on GitHub for this getStream solution | |
// https://github.com/aws/aws-sdk-js/issues/2087#issuecomment-474722151 | |
const getStream = (bucket, key) => { | |
let streamCreated = false; | |
const passThroughStream = new stream.PassThrough(); | |
passThroughStream.on("newListener", event => { | |
if (!streamCreated && event == "data") { | |
const s3Stream = s3 | |
.getObject({ Bucket: bucket, Key: key }) | |
.createReadStream(); | |
s3Stream | |
.on("error", err => passThroughStream.emit("error", err)) | |
.pipe(passThroughStream); | |
streamCreated = true; | |
} | |
}); | |
return passThroughStream; | |
}; | |
exports.handler = async (event, context, callback) => { | |
var bucket = event["bucket"]; | |
var destinationKey = event["destination_key"]; | |
var files = event["files"]; | |
await new Promise(async (resolve, reject) => { | |
var zipStream = streamTo(bucket, destinationKey, resolve); | |
zipStream.on("error", reject); | |
var archive = archiver("zip"); | |
archive.on("error", err => { | |
throw new Error(err); | |
}); | |
archive.pipe(zipStream); | |
for (const file of files) { | |
if (file["type"] == "file") { | |
archive.append(getStream(bucket, file["uri"]), { | |
name: file["filename"] | |
}); | |
} else if (file["type"] == "url") { | |
archive.append(request(file["uri"]), { name: file["filename"] }); | |
} | |
} | |
archive.finalize(); | |
}).catch(err => { | |
throw new Error(err); | |
}); | |
callback(null, { | |
statusCode: 200, | |
body: { final_destination: destinationKey } | |
}); | |
}; |
Hi,
we tried with the solution suggested here but we are facing the following problem.
Suppose we want to zip these files:{ "files": [ { "fileName": "File1_1GB.bin", "key": "File1_1GB.bin" }, { "fileName": "File2_1GB.bin", "key": "File2_1GB.bin" }, { "fileName": "File3_1GB.bin", "key": "File3_1GB.bin" }, { "fileName": "File4_1GB.bin", "key": "File4_1GB.bin" }, { "fileName": "File5_1GB.bin", "key": "File5_1GB.bin" }, ], "bucketRegion": "REGION_NAME", "originBucketName": "BUCKET_NAME", "destBucketName": "DESTBUCKET", "zipName": "ZippedFiles.zip" }
In the ZippedFiles.zip created we have correctly 5 files but they are not of the correct size, like:
- File1 1GB;
- File2 1GB;
- File3 1GB;
- File4 34KB;
- File5 34KB;
Our configuration is 15 minutes the timeout and 10GB the memory.
What can be the problem?
Thanks in advance.
Reagards.
Hello! Which version of the solution are you using? Also have you added additional logging to catch errors? Your type of problem happened to me whenever the stream was not finished writing for some reason normally caused when the server encountered an unexpected error. Moreover, how long was your lambda running for?
The project I am working on has hit a few edge cases and points of learning that I figured I would donate back to the community.
Performance
If anyone is wondering how well this performs on lambda, my team benchmarked it at being able to handle roughly 1Gig/min with little memory footprint, but only when the lambda is given as much CPU power as possible. For us, zipping performance scaled linearly with how much CPU we gave it.
Error handling
In a lambda (I'm pretty sure) if the lambda fails, the infrastructure takes care of the clean up and isolation between hander invocations. My team couldn't use lambda because we needed to zip files larger than we could zip in the timeout provided by lambda. We ended up building a small service to run perpetually on ECS that listened for zip requests to come in via SQS. This forced us to consider how errors were handled more because we didn't have the lambda infrastructure to help us out here.
Some of the problems we found when we inserted failures like references to objects that didn't exist, were unhandled promise rejections, which will cause node to crash in future node versions, and uncaught exceptions which already causes node to crash. This was mostly caused by streams not throwing errors within the call stack so they can be caught by promise rejections or the classic try/catch. The solution we found was to be proactive to handle any error coming from a stream and make sure they could be caught by the promise patterns we wanted to use. If you're use case allows you to stay on lambda, you don't need to worry about this too much, but you would gain a better ability to catch/handle/log errors should you do this. Example implementation below.
AWS SDK V3
The original version of this zipper lambda used the v2 version of the sdk which is slowly becoming out of date, so my team decided to rip off the band aid and upgrade to this version. The major version brought some api changes, but otherwise it remained the same. One of the things it brought was better typescript support which attracted my team. Example implementation below.
Typescript
The original version didn't use this, but my team really liked typescript and the static typing checks it gave us, which became especially helpful when type checking the object the lambda handled. Do know that the
GetObjectCommand
requires that you adddom
to yourlib
array in yourtsconfig.json
because it references some browser types and thus need to include it if you want typescript to work properly for this command. Example implementation below.Example implementation
As mentioned before, my team's use case forced us off of lambda, but I tried to re-lambda-ify the code to make it consistent with this discussion. Sorry if it doesn't compile, but the main concepts I mention in this post are all in it. Also as part of modifying the code to fit our team, we broke the code out into separate contained classes so we could more easily unit test it. Additionally, we changed the object shape slightly to fit our use case (removed the
type
key on the file object, renameduri
tokey
, and camelCased the fields). I also added a few comments explaining some of the complexities discussed previously. I'm open to any feedback y'all got for me! And a big thanks to @amiantos, and everyone else who contributed. My team couldn't have done this without your work! Also if anyone has any questions on how my team implemented that SQS consumer, feel free to reach out to me.// handler.ts import { S3Client } from '@aws-sdk/client-s3' import { S3Service } from './s3Service' import archiver from 'archiver' import type { Archiver } from 'archiver' const s3 = new S3Client({}) const s3Service = new S3Service(s3) interface File { fileName: string key: string } interface ZipEvent { bucket: string destinationKey: string files: File[] } const finalizeArchiveSafely = (archive: Archiver): Promise<void> => { return new Promise((resolve, reject) => { // if we dont reject on error, the archive.finalize() promise will resolve normally // and the error will go unchecked causing the application to crash archive.on('error', reject) archive.finalize().then(resolve).catch(reject) }) } export default async (event: ZipEvent) => { const archive = archiver('zip') try { for (const file of event.files) { const downloadStream = s3Service.createLazyDownloadStreamFrom(event.bucket, file.key) archive.append( downloadStream, { name: file.fileName } ) } // for whatever reason, if we try to await the promises individually, we get problems with trying to handle the errors produced by them // for example, if we tried to await them individually and injected some error like told it to zip an object that didn't exist, both statements would throw // one would be caught by the try catch and the other would be considered an unhandled promise rejection (bizarre, I know, but I kid you not) // using Promise.all was the only solution that seemed to solve the issue for us await Promise.all([ finalizeArchiveSafely(archive), s3Service.uploadTo(event.bucket, event.destinationKey, archive) ]) // with the robustness added, all errors will be caught by this catch block // so no need to worry about unhandled promises or unhandled exceptions } catch (e) { // this makes sure that the archive stops archiving if there is an error archive.abort() throw e } }// s3Service.ts import { PassThrough } from 'stream' import { GetObjectCommand } from '@aws-sdk/client-s3' import { Upload } from '@aws-sdk/lib-storage' import type { S3Client } from '@aws-sdk/client-s3' import type { Readable } from 'stream' export class S3Service { constructor(private s3: S3Client) {} // we need to lazy load the streams because archiver only works on one at a time // if we create a stream to an object in s3, the connection will time out when no traffic is going over it // this will be the case if multiple streams are opened at the same time and one of them is for a very large object public createLazyDownloadStreamFrom(bucket: string, key: string): Readable { let streamCreated = false // create a dummy stream to pass on const stream = new PassThrough() // only when someone first connects to this stream do we fetch the object and feed the data through the dummy stream stream.on('newListener', async event => { if (!streamCreated && event == 'data') { await this.initDownloadStream(bucket, key, stream) streamCreated = true } }) return stream } public async uploadTo( bucket: string, key: string, stream: Readable ): Promise<void> { const upload = new Upload({ client: this.s3, params: { Bucket: bucket, Key: key, // we pipe to a passthrough to handle the case that the stream isn't initialized yet Body: stream.pipe(new PassThrough()), ContentType: 'application/zip' } }) await upload.done() } // if we don't pipe the errors into the stream, we will get unhandled exception errors in the console and won't be caught by any try/catch or .catch constructs that call createLazyDownloadStreamFrom since this initDownloadStream function is called in the callback function of .on('newListener') and thus isn't in the "call stack" of the call to createLazyDownloadStreamFrom // for example, it is totally reasonable that the s3 object asked for doesn't exist // in which case s3.send(new GetObjectCommand(/* */)) throws private async initDownloadStream( bucket: string, key: string, stream: PassThrough ) { try { const { Body: body } = await this.s3.send( new GetObjectCommand({ Bucket: bucket, Key: key }) ) // we need to type narrow here since Body can be one of many things if (!body) { stream.emit( 'error', new Error( `got an undefined body from s3 when getting object ${bucket}/${key}` ) ) } else if (!('on' in body)) { stream.emit( 'error', new Error( `got a ReadableStream<any> (a stream used by browser fetch) or Blob from s3 when getting object ${bucket}/${key} instead of Readable` ) ) } else { body.on('error', err => stream.emit('error', err)).pipe(stream) } } catch (e) { stream.emit('error', e) } } }
🏆
@damianobertuna I'm facing a similar issue, where not all files are correctly zipped.
Did you manage to fix it?
Just wanted to leave a shoutout to @amiantos and @RyanClementsHax for his Typescript solution. Works a dream and saved me a ton of time. Thank you both!
The project I am working on has hit a few edge cases and points of learning that I figured I would donate back to the community.
Performance
If anyone is wondering how well this performs on lambda, my team benchmarked it at being able to handle roughly 1Gig/min with little memory footprint, but only when the lambda is given as much CPU power as possible. For us, zipping performance scaled linearly with how much CPU we gave it.
Error handling
In a lambda (I'm pretty sure) if the lambda fails, the infrastructure takes care of the clean up and isolation between hander invocations. My team couldn't use lambda because we needed to zip files larger than we could zip in the timeout provided by lambda. We ended up building a small service to run perpetually on ECS that listened for zip requests to come in via SQS. This forced us to consider how errors were handled more because we didn't have the lambda infrastructure to help us out here.
Some of the problems we found when we inserted failures like references to objects that didn't exist, were unhandled promise rejections, which will cause node to crash in future node versions, and uncaught exceptions which already causes node to crash. This was mostly caused by streams not throwing errors within the call stack so they can be caught by promise rejections or the classic try/catch. The solution we found was to be proactive to handle any error coming from a stream and make sure they could be caught by the promise patterns we wanted to use. If you're use case allows you to stay on lambda, you don't need to worry about this too much, but you would gain a better ability to catch/handle/log errors should you do this. Example implementation below.
AWS SDK V3
The original version of this zipper lambda used the v2 version of the sdk which is slowly becoming out of date, so my team decided to rip off the band aid and upgrade to this version. The major version brought some api changes, but otherwise it remained the same. One of the things it brought was better typescript support which attracted my team. Example implementation below.
Typescript
The original version didn't use this, but my team really liked typescript and the static typing checks it gave us, which became especially helpful when type checking the object the lambda handled. Do know that the
GetObjectCommand
requires that you adddom
to yourlib
array in yourtsconfig.json
because it references some browser types and thus need to include it if you want typescript to work properly for this command. Example implementation below.Example implementation
As mentioned before, my team's use case forced us off of lambda, but I tried to re-lambda-ify the code to make it consistent with this discussion. Sorry if it doesn't compile, but the main concepts I mention in this post are all in it. Also as part of modifying the code to fit our team, we broke the code out into separate contained classes so we could more easily unit test it. Additionally, we changed the object shape slightly to fit our use case (removed the
type
key on the file object, renameduri
tokey
, and camelCased the fields). I also added a few comments explaining some of the complexities discussed previously. I'm open to any feedback y'all got for me! And a big thanks to @amiantos, and everyone else who contributed. My team couldn't have done this without your work! Also if anyone has any questions on how my team implemented that SQS consumer, feel free to reach out to me.// handler.ts import { S3Client } from '@aws-sdk/client-s3' import { S3Service } from './s3Service' import archiver from 'archiver' import type { Archiver } from 'archiver' const s3 = new S3Client({}) const s3Service = new S3Service(s3) interface File { fileName: string key: string } interface ZipEvent { bucket: string destinationKey: string files: File[] } const finalizeArchiveSafely = (archive: Archiver): Promise<void> => { return new Promise((resolve, reject) => { // if we dont reject on error, the archive.finalize() promise will resolve normally // and the error will go unchecked causing the application to crash archive.on('error', reject) archive.finalize().then(resolve).catch(reject) }) } export default async (event: ZipEvent) => { const archive = archiver('zip') try { for (const file of event.files) { const downloadStream = s3Service.createLazyDownloadStreamFrom(event.bucket, file.key) archive.append( downloadStream, { name: file.fileName } ) } // for whatever reason, if we try to await the promises individually, we get problems with trying to handle the errors produced by them // for example, if we tried to await them individually and injected some error like told it to zip an object that didn't exist, both statements would throw // one would be caught by the try catch and the other would be considered an unhandled promise rejection (bizarre, I know, but I kid you not) // using Promise.all was the only solution that seemed to solve the issue for us await Promise.all([ finalizeArchiveSafely(archive), s3Service.uploadTo(event.bucket, event.destinationKey, archive) ]) // with the robustness added, all errors will be caught by this catch block // so no need to worry about unhandled promises or unhandled exceptions } catch (e) { // this makes sure that the archive stops archiving if there is an error archive.abort() throw e } }// s3Service.ts import { PassThrough } from 'stream' import { GetObjectCommand } from '@aws-sdk/client-s3' import { Upload } from '@aws-sdk/lib-storage' import type { S3Client } from '@aws-sdk/client-s3' import type { Readable } from 'stream' export class S3Service { constructor(private s3: S3Client) {} // we need to lazy load the streams because archiver only works on one at a time // if we create a stream to an object in s3, the connection will time out when no traffic is going over it // this will be the case if multiple streams are opened at the same time and one of them is for a very large object public createLazyDownloadStreamFrom(bucket: string, key: string): Readable { let streamCreated = false // create a dummy stream to pass on const stream = new PassThrough() // only when someone first connects to this stream do we fetch the object and feed the data through the dummy stream stream.on('newListener', async event => { if (!streamCreated && event == 'data') { await this.initDownloadStream(bucket, key, stream) streamCreated = true } }) return stream } public async uploadTo( bucket: string, key: string, stream: Readable ): Promise<void> { const upload = new Upload({ client: this.s3, params: { Bucket: bucket, Key: key, // we pipe to a passthrough to handle the case that the stream isn't initialized yet Body: stream.pipe(new PassThrough()), ContentType: 'application/zip' } }) await upload.done() } // if we don't pipe the errors into the stream, we will get unhandled exception errors in the console and won't be caught by any try/catch or .catch constructs that call createLazyDownloadStreamFrom since this initDownloadStream function is called in the callback function of .on('newListener') and thus isn't in the "call stack" of the call to createLazyDownloadStreamFrom // for example, it is totally reasonable that the s3 object asked for doesn't exist // in which case s3.send(new GetObjectCommand(/* */)) throws private async initDownloadStream( bucket: string, key: string, stream: PassThrough ) { try { const { Body: body } = await this.s3.send( new GetObjectCommand({ Bucket: bucket, Key: key }) ) // we need to type narrow here since Body can be one of many things if (!body) { stream.emit( 'error', new Error( `got an undefined body from s3 when getting object ${bucket}/${key}` ) ) } else if (!('on' in body)) { stream.emit( 'error', new Error( `got a ReadableStream<any> (a stream used by browser fetch) or Blob from s3 when getting object ${bucket}/${key} instead of Readable` ) ) } else { body.on('error', err => stream.emit('error', err)).pipe(stream) } } catch (e) { stream.emit('error', e) } } }
Thank you so much ! I had an issue where my lambda would die on larger/multiple files without throwing any errors with Archiver and SDK v3, this was the fix. Great work !
Really happy to see how this gist has grown and evolved over the years. Kudos to everyone who has weighed in, offered more code and advice!
I was struggling with this today. Glad to know that I am not alone <3
Hi, I need someone to help me please. I have created a Lambda function to trigger S3 bucket and zip the uploaded files then store them in a destination bucket. but when I upload files it doesn't appear to destination bucket. I have searched a lot but couldn't find any solution for that. I configured all the permission and IAM but still it doesn't work.
zipkit.io is another way to solve this problem.
Really happy to see how this gist has grown and evolved over the years. Kudos to everyone who has weighed in, offered more code and advice!
I am going to join in and sing praises for @amiantos - you saved my butt today!
I had to write a Lambda to zip thousands of files. The task would just go dark - no zip, no errors. Your V3 script did the trick - well done!
Hi,
we tried with the solution suggested here but we are facing the following problem.
Suppose we want to zip these files:
In the ZippedFiles.zip created we have correctly 5 files but they are not of the correct size, like:
Our configuration is 15 minutes the timeout and 10GB the memory.
What can be the problem?
Thanks in advance.
Reagards.