Skip to content

Instantly share code, notes, and snippets.

@amiantos
Last active October 30, 2024 20:12
Show Gist options
  • Save amiantos/16bacc9ed742c91151fcf1a41012445e to your computer and use it in GitHub Desktop.
Save amiantos/16bacc9ed742c91151fcf1a41012445e to your computer and use it in GitHub Desktop.
Zip Multiple Files from S3 using AWS Lambda Function
// Lambda S3 Zipper
// http://amiantos.net/zip-multiple-files-on-aws-s3/
//
// Accepts a bundle of data in the format...
// {
// "bucket": "your-bucket",
// "destination_key": "zips/test.zip",
// "files": [
// {
// "uri": "...", (options: S3 file key or URL)
// "filename": "...", (filename of file inside zip)
// "type": "..." (options: [file, url])
// }
// ]
// }
// Saves zip file at "destination_key" location
"use strict";
const AWS = require("aws-sdk");
const awsOptions = {
region: "us-east-1",
httpOptions: {
timeout: 300000 // Matching Lambda function timeout
}
};
const s3 = new AWS.S3(awsOptions);
const archiver = require("archiver");
const stream = require("stream");
const request = require("request");
const streamTo = (bucket, key, resolve) => {
var passthrough = new stream.PassThrough();
s3.upload(
{
Bucket: bucket,
Key: key,
Body: passthrough,
ContentType: "application/zip",
ServerSideEncryption: "AES256"
},
(err, data) => {
if (err) throw err;
console.log("Zip uploaded");
resolve();
}
).on("httpUploadProgress", (progress) => {
console.log(progress);
});
return passthrough;
};
// Kudos to this person on GitHub for this getStream solution
// https://github.com/aws/aws-sdk-js/issues/2087#issuecomment-474722151
const getStream = (bucket, key) => {
let streamCreated = false;
const passThroughStream = new stream.PassThrough();
passThroughStream.on("newListener", event => {
if (!streamCreated && event == "data") {
const s3Stream = s3
.getObject({ Bucket: bucket, Key: key })
.createReadStream();
s3Stream
.on("error", err => passThroughStream.emit("error", err))
.pipe(passThroughStream);
streamCreated = true;
}
});
return passThroughStream;
};
exports.handler = async (event, context, callback) => {
var bucket = event["bucket"];
var destinationKey = event["destination_key"];
var files = event["files"];
await new Promise(async (resolve, reject) => {
var zipStream = streamTo(bucket, destinationKey, resolve);
zipStream.on("error", reject);
var archive = archiver("zip");
archive.on("error", err => {
throw new Error(err);
});
archive.pipe(zipStream);
for (const file of files) {
if (file["type"] == "file") {
archive.append(getStream(bucket, file["uri"]), {
name: file["filename"]
});
} else if (file["type"] == "url") {
archive.append(request(file["uri"]), { name: file["filename"] });
}
}
archive.finalize();
}).catch(err => {
throw new Error(err);
});
callback(null, {
statusCode: 200,
body: { final_destination: destinationKey }
});
};
@amiantos
Copy link
Author

@daniele-pelagatti huh! interesting 🤔 if the error was common enough for you, I'm surprised it hasn't shown up on my side. We've served up quite a few zips generated this way and haven't heard any complaints... I do feel like the description of the error that happens (file doesn't complete until the next task begins, basically) is familiar to me and I did run into it while building this, but it's been so long now I don't remember how I fixed it at the time.

@daniele-pelagatti
Copy link

@daniele-pelagatti huh! interesting 🤔 if the error was common enough for you, I'm surprised it hasn't shown up on my side. We've served up quite a few zips generated this way and haven't heard any complaints... I do feel like the description of the error that happens (file doesn't complete until the next task begins, basically) is familiar to me and I did run into it while building this, but it's been so long now I don't remember how I fixed it at the time.

It was pretty consistent, yes. But that could have been because of many contributing factors:

  • I create a zip with compression level 0 (i want fast compression in order to process as many files as possible before running in the 15 minutes timeout for an async lambda invokation). This maybe allowed archiver to finish the archive before the upload was complete.
  • I use an async return value while you invoke the lambda callback: maybe this contributes somehow?
  • the problem was consistent only above a certain threshold of files compressed, not sure why.
  • I use node 12 runtime in my lambda, maybe an older runtime didn't trigger the problem?
  • the zip creation/upload was the last thing I did in my handler: like I mentioned, if you give it a few seconds after the archive process is done, the upload will complete and the zip will be created in the s3. This wasn't my case so the lambda would not finish the s3.upload in time.

Neverthless: the whole purpose of this lambda is to upload something to an s3 (coincidentally a streaming zip), so I find it fitting to declare the process as "done" only when s3.upload is done

@jonafd1
Copy link

jonafd1 commented Jan 13, 2021

@amiantos - Thank you! I really appreciate the thorough response.

I ran into this article:

https://www.serverless.com/blog/common-node8-mistakes-in-lambda

It suggests removing the callback and replacing it with a simple return.

image

My IDE integration now works :). Admittedly, I do need to learn the ins-and-outs of deployment a bit more.

Hello @949mac. May I ask if you made any other code changes besides changing the callback to a return? Because I tried it on me end, without changing anything besides that, but it did not work.

@hamsteven
Copy link

@amiantos thanks for the reply. When I have some free time I'm going to try out some more ideas with python. this node script helped me big time so huge thanks. I'll post back if I get anywhere with python

Did you ever get anywhere doing this with Python? I'm currently attempting it.

@amiantos
Copy link
Author

Did you ever get anywhere doing this with Python? I'm currently attempting it.

Pinging @WLS-JD just in case it helps them see your question.

@RyanClementsHax
Copy link

RyanClementsHax commented Jun 17, 2021

@daniele-pelagatti huh! interesting 🤔 if the error was common enough for you, I'm surprised it hasn't shown up on my side. We've served up quite a few zips generated this way and haven't heard any complaints... I do feel like the description of the error that happens (file doesn't complete until the next task begins, basically) is familiar to me and I did run into it while building this, but it's been so long now I don't remember how I fixed it at the time.

It was pretty consistent, yes. But that could have been because of many contributing factors:

  • I create a zip with compression level 0 (i want fast compression in order to process as many files as possible before running in the 15 minutes timeout for an async lambda invokation). This maybe allowed archiver to finish the archive before the upload was complete.
  • I use an async return value while you invoke the lambda callback: maybe this contributes somehow?
  • the problem was consistent only above a certain threshold of files compressed, not sure why.
  • I use node 12 runtime in my lambda, maybe an older runtime didn't trigger the problem?
  • the zip creation/upload was the last thing I did in my handler: like I mentioned, if you give it a few seconds after the archive process is done, the upload will complete and the zip will be created in the s3. This wasn't my case so the lambda would not finish the s3.upload in time.

Neverthless: the whole purpose of this lambda is to upload something to an s3 (coincidentally a streaming zip), so I find it fitting to declare the process as "done" only when s3.upload is done

Hello! Excellent work @amiantos for creating this and thanks to all that have contributed. I can't describe how many headaches this solved for me and my team. I figured I would contribute what I have found regarding the above issue.

I have also recreated this issue locally (haven't tried deploying the lambda yet). The lag between when the handler resolves and when the final part uploads seems scale with the size of the final zip. Making sure that the callback passed into s3.upload is the thing that resolves the promise rather than the zipStream close and end events fixed the issue for me and got everything synced. Below is the original gist with the fix.

// Lambda S3 Zipper
// http://amiantos.net/zip-multiple-files-on-aws-s3/
//
// Accepts a bundle of data in the format...
// {
//     "bucket": "your-bucket",
//     "destination_key": "zips/test.zip",
//     "files": [
//         {
//             "uri": "...", (options: S3 file key or URL)
//             "filename": "...", (filename of file inside zip)
//             "type": "..." (options: [file, url])
//         }
//     ]
// }
// Saves zip file at "destination_key" location

"use strict";

const AWS = require("aws-sdk");
const awsOptions = {
  region: "us-east-1",
  httpOptions: {
    timeout: 300000 // Matching Lambda function timeout
  }
};
const s3 = new AWS.S3(awsOptions);
const archiver = require("archiver");
const stream = require("stream");
const request = require("request");

// take in a resolve function ----v
const streamTo = (bucket, key, resolve) => {
  var passthrough = new stream.PassThrough();
  s3.upload(
    {
      Bucket: bucket,
      Key: key,
      Body: passthrough,
      ContentType: "application/zip",
      ServerSideEncryption: "AES256"
    },
    (err, data) => {
      if (err) {
        console.error('Error while uploading zip')
        reject(err)
        return
      }
      console.log('Zip uploaded')
      // at this point, we know that the zip has for sure finished uploading and it is safe to resolve the promise
      resolve()
    }
    // add this if you want to see the progress of the upload
    // this is how I found out that the zip wasn't done uploading for sure before the lambda finished
    ).on("httpUploadProgress", progress => {
      console.log(progress)
    });
  return passthrough;
};

// Kudos to this person on GitHub for this getStream solution
// https://github.com/aws/aws-sdk-js/issues/2087#issuecomment-474722151
const getStream = (bucket, key) => {
  let streamCreated = false;
  const passThroughStream = new stream.PassThrough();

  passThroughStream.on("newListener", event => {
    if (!streamCreated && event == "data") {
      const s3Stream = s3
        .getObject({ Bucket: bucket, Key: key })
        .createReadStream();
      s3Stream
        .on("error", err => passThroughStream.emit("error", err))
        .pipe(passThroughStream);

      streamCreated = true;
    }
  });

  return passThroughStream;
};

exports.handler = async (event, context, callback) => {
  var bucket = event["bucket"];
  var destinationKey = event["destination_key"];
  var files = event["files"];

  await new Promise(async (resolve, reject) => {
    // pass the resolve function here -----------------v
    var zipStream = streamTo(bucket, destinationKey, resolve);
    // make sure these don't resolve the promise
    // zipStream.on("close", resolve);
    // zipStream.on("end", resolve);
    zipStream.on("error", reject);

    var archive = archiver("zip");
    archive.on("error", err => {
      throw new Error(err);
    });
    archive.pipe(zipStream);

    for (const file of files) {
      if (file["type"] == "file") {
      archive.append(getStream(bucket, file["uri"]), {
        name: file["filename"]
      });
      } else if (file["type"] == "url") {
        archive.append(request(file["uri"]), { name: file["filename"] });
      }
    }
    archive.finalize();
  }).catch(err => {
    throw new Error(err);
  });

  callback(null, {
    statusCode: 200,
    body: { final_destination: destinationKey }
  });
};

@amiantos
Copy link
Author

@RyanClementsHax thanks! while we weren't running into this issue in production (how? why?) I went ahead and implemented your changes on our side since they make sense to me, and I also updated the gist itself with your changes. Thanks again for the extra legwork on diagnosing this issue.

@RyanClementsHax
Copy link

@RyanClementsHax thanks! while we weren't running into this issue in production (how? why?) I went ahead and implemented your changes on our side since they make sense to me, and I also updated the gist itself with your changes. Thanks again for the extra legwork on diagnosing this issue.

no problem!

@DDynamic
Copy link

@amiantos with this script, do you think there is an easy way to introduce concurrent stream processing? It appears that the read streams are created and processed sequentially. I'm testing this out with zipping over 10,000 small files (~50 KB each).

@amiantos
Copy link
Author

@DDynamic I'm no expert but I assume you can't add multiple files to a zip file at the same time. I did a little googling, looks like this assumption is correct, the zip algorithm needs to process one stream at a time.

@RyanClementsHax
Copy link

RyanClementsHax commented Jul 6, 2021

The project I am working on has hit a few edge cases and points of learning that I figured I would donate back to the community.

Performance

If anyone is wondering how well this performs on lambda, my team benchmarked it at being able to handle roughly 1Gig/min with little memory footprint, but only when the lambda is given as much CPU power as possible. For us, zipping performance scaled linearly with how much CPU we gave it.

Error handling

In a lambda (I'm pretty sure) if the lambda fails, the infrastructure takes care of the clean up and isolation between hander invocations. My team couldn't use lambda because we needed to zip files larger than we could zip in the timeout provided by lambda. We ended up building a small service to run perpetually on ECS that listened for zip requests to come in via SQS. This forced us to consider how errors were handled more because we didn't have the lambda infrastructure to help us out here.

Some of the problems we found when we inserted failures like references to objects that didn't exist, were unhandled promise rejections, which will cause node to crash in future node versions, and uncaught exceptions which already causes node to crash. This was mostly caused by streams not throwing errors within the call stack so they can be caught by promise rejections or the classic try/catch. The solution we found was to be proactive to handle any error coming from a stream and make sure they could be caught by the promise patterns we wanted to use. If you're use case allows you to stay on lambda, you don't need to worry about this too much, but you would gain a better ability to catch/handle/log errors should you do this. Example implementation below.

AWS SDK V3

The original version of this zipper lambda used the v2 version of the sdk which is slowly becoming out of date, so my team decided to rip off the band aid and upgrade to this version. The major version brought some api changes, but otherwise it remained the same. One of the things it brought was better typescript support which attracted my team. Example implementation below.

Typescript

The original version didn't use this, but my team really liked typescript and the static typing checks it gave us, which became especially helpful when type checking the object the lambda handled. Do know that the GetObjectCommand requires that you add dom to your lib array in your tsconfig.json because it references some browser types and thus need to include it if you want typescript to work properly for this command. Example implementation below.

Example implementation

As mentioned before, my team's use case forced us off of lambda, but I tried to re-lambda-ify the code to make it consistent with this discussion. Sorry if it doesn't compile, but the main concepts I mention in this post are all in it. Also as part of modifying the code to fit our team, we broke the code out into separate contained classes so we could more easily unit test it. Additionally, we changed the object shape slightly to fit our use case (removed the type key on the file object, renamed uri to key, and camelCased the fields). I also added a few comments explaining some of the complexities discussed previously. I'm open to any feedback y'all got for me! And a big thanks to @amiantos, and everyone else who contributed. My team couldn't have done this without your work! Also if anyone has any questions on how my team implemented that SQS consumer, feel free to reach out to me.

// handler.ts
import { S3Client } from '@aws-sdk/client-s3'
import { S3Service } from './s3Service'
import archiver from 'archiver'
import type { Archiver } from 'archiver'

const s3 = new S3Client({})
const s3Service = new S3Service(s3)

interface File {
    fileName: string
    key: string
}

interface ZipEvent {
    bucket: string
    destinationKey: string
    files: File[]
}

const finalizeArchiveSafely = (archive: Archiver): Promise<void> => {
    return new Promise((resolve, reject) => {
        // if we dont reject on error, the archive.finalize() promise will resolve normally
        // and the error will go unchecked causing the application to crash
        archive.on('error', reject)
        archive.finalize().then(resolve).catch(reject)
    })
}

export default async (event: ZipEvent) => {
    const archive = archiver('zip')

    try {
        for (const file of event.files) {
            const downloadStream = s3Service.createLazyDownloadStreamFrom(event.bucket, file.key)
            archive.append(
                downloadStream,
                {
                    name: file.fileName
                }
            )
        }

        // for whatever reason, if we try to await the promises individually, we get problems with trying to handle the errors produced by them
        // for example, if we tried to await them individually and injected some error like told it to zip an object that didn't exist, both statements would throw
        // one would be caught by the try catch and the other would be considered an unhandled promise rejection (bizarre, I know, but I kid you not)
        // using Promise.all was the only solution that seemed to solve the issue for us
        await Promise.all([
            finalizeArchiveSafely(archive),
            s3Service.uploadTo(event.bucket, event.destinationKey, archive)
        ])
    // with the robustness added, all errors will be caught by this catch block
    // so no need to worry about unhandled promises or unhandled exceptions
    } catch (e) {
        // this makes sure that the archive stops archiving if there is an error
        archive.abort()
        throw e
    }
}
// s3Service.ts
import { PassThrough } from 'stream'
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'

import type { S3Client } from '@aws-sdk/client-s3'
import type { Readable } from 'stream'

export class S3Service {
    constructor(private s3: S3Client) {}

    // we need to lazy load the streams because archiver only works on one at a time
    // if we create a stream to an object in s3, the connection will time out when no traffic is going over it
    // this will be the case if multiple streams are opened at the same time and one of them is for a very large object
    public createLazyDownloadStreamFrom(bucket: string, key: string): Readable {
        let streamCreated = false
        // create a dummy stream to pass on
        const stream = new PassThrough()
        // only when someone first connects to this stream do we fetch the object and feed the data through the dummy stream
        stream.on('newListener', async event => {
            if (!streamCreated && event == 'data') {
                await this.initDownloadStream(bucket, key, stream)
                streamCreated = true
            }
        })

        return stream
    }

    public async uploadTo(
        bucket: string,
        key: string,
        stream: Readable
    ): Promise<void> {
        const upload = new Upload({
            client: this.s3,
            params: {
                Bucket: bucket,
                Key: key,
                // we pipe to a passthrough to handle the case that the stream isn't initialized yet
                Body: stream.pipe(new PassThrough()),
                ContentType: 'application/zip'
            }
        })
        await upload.done()
    }


    // if we don't pipe the errors into the stream, we will get unhandled exception errors in the console and won't be caught by any try/catch or .catch constructs that call createLazyDownloadStreamFrom since this initDownloadStream function is called in the callback function of .on('newListener') and thus isn't in the "call stack" of the call to createLazyDownloadStreamFrom
    // for example, it is totally reasonable that the s3 object asked for doesn't exist
    // in which case s3.send(new GetObjectCommand(/* */)) throws
    private async initDownloadStream(
        bucket: string,
        key: string,
        stream: PassThrough
    ) {
        try {
            const { Body: body } = await this.s3.send(
                new GetObjectCommand({ Bucket: bucket, Key: key })
            )
            // we need to type narrow here since Body can be one of many things
            if (!body) {
                stream.emit(
                    'error',
                    new Error(
                        `got an undefined body from s3 when getting object ${bucket}/${key}`
                    )
                )
            } else if (!('on' in body)) {
                stream.emit(
                    'error',
                    new Error(
                        `got a ReadableStream<any> (a stream used by browser fetch) or Blob from s3 when getting object ${bucket}/${key} instead of Readable`
                    )
                )
            } else {
                body.on('error', err => stream.emit('error', err)).pipe(stream)
            }
        } catch (e) {
            stream.emit('error', e)
        }
    }
}

@damianobertuna
Copy link

Hi,

we tried with the solution suggested here but we are facing the following problem.
Suppose we want to zip these files:

{
"files": [
    {
      "fileName": "File1_1GB.bin",
      "key": "File1_1GB.bin"
    },
    {
      "fileName": "File2_1GB.bin",
      "key": "File2_1GB.bin"
    },
    {
      "fileName": "File3_1GB.bin",
      "key": "File3_1GB.bin"
    },
    {
      "fileName": "File4_1GB.bin",
      "key": "File4_1GB.bin"
    },
    {
      "fileName": "File5_1GB.bin",
      "key": "File5_1GB.bin"
    },
],
  "bucketRegion": "REGION_NAME",
  "originBucketName": "BUCKET_NAME",
  "destBucketName": "DESTBUCKET",
  "zipName": "ZippedFiles.zip"
}

In the ZippedFiles.zip created we have correctly 5 files but they are not of the correct size, like:

  • File1 1GB;
  • File2 1GB;
  • File3 1GB;
  • File4 34KB;
  • File5 34KB;

Our configuration is 15 minutes the timeout and 10GB the memory.

What can be the problem?

Thanks in advance.

Reagards.

@RyanClementsHax
Copy link

RyanClementsHax commented Jul 30, 2021

Hi,

we tried with the solution suggested here but we are facing the following problem.
Suppose we want to zip these files:

{
"files": [
    {
      "fileName": "File1_1GB.bin",
      "key": "File1_1GB.bin"
    },
    {
      "fileName": "File2_1GB.bin",
      "key": "File2_1GB.bin"
    },
    {
      "fileName": "File3_1GB.bin",
      "key": "File3_1GB.bin"
    },
    {
      "fileName": "File4_1GB.bin",
      "key": "File4_1GB.bin"
    },
    {
      "fileName": "File5_1GB.bin",
      "key": "File5_1GB.bin"
    },
],
  "bucketRegion": "REGION_NAME",
  "originBucketName": "BUCKET_NAME",
  "destBucketName": "DESTBUCKET",
  "zipName": "ZippedFiles.zip"
}

In the ZippedFiles.zip created we have correctly 5 files but they are not of the correct size, like:

  • File1 1GB;
  • File2 1GB;
  • File3 1GB;
  • File4 34KB;
  • File5 34KB;

Our configuration is 15 minutes the timeout and 10GB the memory.

What can be the problem?

Thanks in advance.

Reagards.

Hello! Which version of the solution are you using? Also have you added additional logging to catch errors? Your type of problem happened to me whenever the stream was not finished writing for some reason normally caused when the server encountered an unexpected error. Moreover, how long was your lambda running for?

@Trigno97
Copy link

The project I am working on has hit a few edge cases and points of learning that I figured I would donate back to the community.

Performance

If anyone is wondering how well this performs on lambda, my team benchmarked it at being able to handle roughly 1Gig/min with little memory footprint, but only when the lambda is given as much CPU power as possible. For us, zipping performance scaled linearly with how much CPU we gave it.

Error handling

In a lambda (I'm pretty sure) if the lambda fails, the infrastructure takes care of the clean up and isolation between hander invocations. My team couldn't use lambda because we needed to zip files larger than we could zip in the timeout provided by lambda. We ended up building a small service to run perpetually on ECS that listened for zip requests to come in via SQS. This forced us to consider how errors were handled more because we didn't have the lambda infrastructure to help us out here.

Some of the problems we found when we inserted failures like references to objects that didn't exist, were unhandled promise rejections, which will cause node to crash in future node versions, and uncaught exceptions which already causes node to crash. This was mostly caused by streams not throwing errors within the call stack so they can be caught by promise rejections or the classic try/catch. The solution we found was to be proactive to handle any error coming from a stream and make sure they could be caught by the promise patterns we wanted to use. If you're use case allows you to stay on lambda, you don't need to worry about this too much, but you would gain a better ability to catch/handle/log errors should you do this. Example implementation below.

AWS SDK V3

The original version of this zipper lambda used the v2 version of the sdk which is slowly becoming out of date, so my team decided to rip off the band aid and upgrade to this version. The major version brought some api changes, but otherwise it remained the same. One of the things it brought was better typescript support which attracted my team. Example implementation below.

Typescript

The original version didn't use this, but my team really liked typescript and the static typing checks it gave us, which became especially helpful when type checking the object the lambda handled. Do know that the GetObjectCommand requires that you add dom to your lib array in your tsconfig.json because it references some browser types and thus need to include it if you want typescript to work properly for this command. Example implementation below.

Example implementation

As mentioned before, my team's use case forced us off of lambda, but I tried to re-lambda-ify the code to make it consistent with this discussion. Sorry if it doesn't compile, but the main concepts I mention in this post are all in it. Also as part of modifying the code to fit our team, we broke the code out into separate contained classes so we could more easily unit test it. Additionally, we changed the object shape slightly to fit our use case (removed the type key on the file object, renamed uri to key, and camelCased the fields). I also added a few comments explaining some of the complexities discussed previously. I'm open to any feedback y'all got for me! And a big thanks to @amiantos, and everyone else who contributed. My team couldn't have done this without your work! Also if anyone has any questions on how my team implemented that SQS consumer, feel free to reach out to me.

// handler.ts
import { S3Client } from '@aws-sdk/client-s3'
import { S3Service } from './s3Service'
import archiver from 'archiver'
import type { Archiver } from 'archiver'

const s3 = new S3Client({})
const s3Service = new S3Service(s3)

interface File {
    fileName: string
    key: string
}

interface ZipEvent {
    bucket: string
    destinationKey: string
    files: File[]
}

const finalizeArchiveSafely = (archive: Archiver): Promise<void> => {
    return new Promise((resolve, reject) => {
        // if we dont reject on error, the archive.finalize() promise will resolve normally
        // and the error will go unchecked causing the application to crash
        archive.on('error', reject)
        archive.finalize().then(resolve).catch(reject)
    })
}

export default async (event: ZipEvent) => {
    const archive = archiver('zip')

    try {
        for (const file of event.files) {
            const downloadStream = s3Service.createLazyDownloadStreamFrom(event.bucket, file.key)
            archive.append(
                downloadStream,
                {
                    name: file.fileName
                }
            )
        }

        // for whatever reason, if we try to await the promises individually, we get problems with trying to handle the errors produced by them
        // for example, if we tried to await them individually and injected some error like told it to zip an object that didn't exist, both statements would throw
        // one would be caught by the try catch and the other would be considered an unhandled promise rejection (bizarre, I know, but I kid you not)
        // using Promise.all was the only solution that seemed to solve the issue for us
        await Promise.all([
            finalizeArchiveSafely(archive),
            s3Service.uploadTo(event.bucket, event.destinationKey, archive)
        ])
    // with the robustness added, all errors will be caught by this catch block
    // so no need to worry about unhandled promises or unhandled exceptions
    } catch (e) {
        // this makes sure that the archive stops archiving if there is an error
        archive.abort()
        throw e
    }
}
// s3Service.ts
import { PassThrough } from 'stream'
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'

import type { S3Client } from '@aws-sdk/client-s3'
import type { Readable } from 'stream'

export class S3Service {
    constructor(private s3: S3Client) {}

    // we need to lazy load the streams because archiver only works on one at a time
    // if we create a stream to an object in s3, the connection will time out when no traffic is going over it
    // this will be the case if multiple streams are opened at the same time and one of them is for a very large object
    public createLazyDownloadStreamFrom(bucket: string, key: string): Readable {
        let streamCreated = false
        // create a dummy stream to pass on
        const stream = new PassThrough()
        // only when someone first connects to this stream do we fetch the object and feed the data through the dummy stream
        stream.on('newListener', async event => {
            if (!streamCreated && event == 'data') {
                await this.initDownloadStream(bucket, key, stream)
                streamCreated = true
            }
        })

        return stream
    }

    public async uploadTo(
        bucket: string,
        key: string,
        stream: Readable
    ): Promise<void> {
        const upload = new Upload({
            client: this.s3,
            params: {
                Bucket: bucket,
                Key: key,
                // we pipe to a passthrough to handle the case that the stream isn't initialized yet
                Body: stream.pipe(new PassThrough()),
                ContentType: 'application/zip'
            }
        })
        await upload.done()
    }


    // if we don't pipe the errors into the stream, we will get unhandled exception errors in the console and won't be caught by any try/catch or .catch constructs that call createLazyDownloadStreamFrom since this initDownloadStream function is called in the callback function of .on('newListener') and thus isn't in the "call stack" of the call to createLazyDownloadStreamFrom
    // for example, it is totally reasonable that the s3 object asked for doesn't exist
    // in which case s3.send(new GetObjectCommand(/* */)) throws
    private async initDownloadStream(
        bucket: string,
        key: string,
        stream: PassThrough
    ) {
        try {
            const { Body: body } = await this.s3.send(
                new GetObjectCommand({ Bucket: bucket, Key: key })
            )
            // we need to type narrow here since Body can be one of many things
            if (!body) {
                stream.emit(
                    'error',
                    new Error(
                        `got an undefined body from s3 when getting object ${bucket}/${key}`
                    )
                )
            } else if (!('on' in body)) {
                stream.emit(
                    'error',
                    new Error(
                        `got a ReadableStream<any> (a stream used by browser fetch) or Blob from s3 when getting object ${bucket}/${key} instead of Readable`
                    )
                )
            } else {
                body.on('error', err => stream.emit('error', err)).pipe(stream)
            }
        } catch (e) {
            stream.emit('error', e)
        }
    }
}

🏆

@dakebusi
Copy link

@damianobertuna I'm facing a similar issue, where not all files are correctly zipped.

Did you manage to fix it?

@mikehadlow
Copy link

Just wanted to leave a shoutout to @amiantos and @RyanClementsHax for his Typescript solution. Works a dream and saved me a ton of time. Thank you both!

@Limesior
Copy link

The project I am working on has hit a few edge cases and points of learning that I figured I would donate back to the community.

Performance

If anyone is wondering how well this performs on lambda, my team benchmarked it at being able to handle roughly 1Gig/min with little memory footprint, but only when the lambda is given as much CPU power as possible. For us, zipping performance scaled linearly with how much CPU we gave it.

Error handling

In a lambda (I'm pretty sure) if the lambda fails, the infrastructure takes care of the clean up and isolation between hander invocations. My team couldn't use lambda because we needed to zip files larger than we could zip in the timeout provided by lambda. We ended up building a small service to run perpetually on ECS that listened for zip requests to come in via SQS. This forced us to consider how errors were handled more because we didn't have the lambda infrastructure to help us out here.

Some of the problems we found when we inserted failures like references to objects that didn't exist, were unhandled promise rejections, which will cause node to crash in future node versions, and uncaught exceptions which already causes node to crash. This was mostly caused by streams not throwing errors within the call stack so they can be caught by promise rejections or the classic try/catch. The solution we found was to be proactive to handle any error coming from a stream and make sure they could be caught by the promise patterns we wanted to use. If you're use case allows you to stay on lambda, you don't need to worry about this too much, but you would gain a better ability to catch/handle/log errors should you do this. Example implementation below.

AWS SDK V3

The original version of this zipper lambda used the v2 version of the sdk which is slowly becoming out of date, so my team decided to rip off the band aid and upgrade to this version. The major version brought some api changes, but otherwise it remained the same. One of the things it brought was better typescript support which attracted my team. Example implementation below.

Typescript

The original version didn't use this, but my team really liked typescript and the static typing checks it gave us, which became especially helpful when type checking the object the lambda handled. Do know that the GetObjectCommand requires that you add dom to your lib array in your tsconfig.json because it references some browser types and thus need to include it if you want typescript to work properly for this command. Example implementation below.

Example implementation

As mentioned before, my team's use case forced us off of lambda, but I tried to re-lambda-ify the code to make it consistent with this discussion. Sorry if it doesn't compile, but the main concepts I mention in this post are all in it. Also as part of modifying the code to fit our team, we broke the code out into separate contained classes so we could more easily unit test it. Additionally, we changed the object shape slightly to fit our use case (removed the type key on the file object, renamed uri to key, and camelCased the fields). I also added a few comments explaining some of the complexities discussed previously. I'm open to any feedback y'all got for me! And a big thanks to @amiantos, and everyone else who contributed. My team couldn't have done this without your work! Also if anyone has any questions on how my team implemented that SQS consumer, feel free to reach out to me.

// handler.ts
import { S3Client } from '@aws-sdk/client-s3'
import { S3Service } from './s3Service'
import archiver from 'archiver'
import type { Archiver } from 'archiver'

const s3 = new S3Client({})
const s3Service = new S3Service(s3)

interface File {
    fileName: string
    key: string
}

interface ZipEvent {
    bucket: string
    destinationKey: string
    files: File[]
}

const finalizeArchiveSafely = (archive: Archiver): Promise<void> => {
    return new Promise((resolve, reject) => {
        // if we dont reject on error, the archive.finalize() promise will resolve normally
        // and the error will go unchecked causing the application to crash
        archive.on('error', reject)
        archive.finalize().then(resolve).catch(reject)
    })
}

export default async (event: ZipEvent) => {
    const archive = archiver('zip')

    try {
        for (const file of event.files) {
            const downloadStream = s3Service.createLazyDownloadStreamFrom(event.bucket, file.key)
            archive.append(
                downloadStream,
                {
                    name: file.fileName
                }
            )
        }

        // for whatever reason, if we try to await the promises individually, we get problems with trying to handle the errors produced by them
        // for example, if we tried to await them individually and injected some error like told it to zip an object that didn't exist, both statements would throw
        // one would be caught by the try catch and the other would be considered an unhandled promise rejection (bizarre, I know, but I kid you not)
        // using Promise.all was the only solution that seemed to solve the issue for us
        await Promise.all([
            finalizeArchiveSafely(archive),
            s3Service.uploadTo(event.bucket, event.destinationKey, archive)
        ])
    // with the robustness added, all errors will be caught by this catch block
    // so no need to worry about unhandled promises or unhandled exceptions
    } catch (e) {
        // this makes sure that the archive stops archiving if there is an error
        archive.abort()
        throw e
    }
}
// s3Service.ts
import { PassThrough } from 'stream'
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'

import type { S3Client } from '@aws-sdk/client-s3'
import type { Readable } from 'stream'

export class S3Service {
    constructor(private s3: S3Client) {}

    // we need to lazy load the streams because archiver only works on one at a time
    // if we create a stream to an object in s3, the connection will time out when no traffic is going over it
    // this will be the case if multiple streams are opened at the same time and one of them is for a very large object
    public createLazyDownloadStreamFrom(bucket: string, key: string): Readable {
        let streamCreated = false
        // create a dummy stream to pass on
        const stream = new PassThrough()
        // only when someone first connects to this stream do we fetch the object and feed the data through the dummy stream
        stream.on('newListener', async event => {
            if (!streamCreated && event == 'data') {
                await this.initDownloadStream(bucket, key, stream)
                streamCreated = true
            }
        })

        return stream
    }

    public async uploadTo(
        bucket: string,
        key: string,
        stream: Readable
    ): Promise<void> {
        const upload = new Upload({
            client: this.s3,
            params: {
                Bucket: bucket,
                Key: key,
                // we pipe to a passthrough to handle the case that the stream isn't initialized yet
                Body: stream.pipe(new PassThrough()),
                ContentType: 'application/zip'
            }
        })
        await upload.done()
    }


    // if we don't pipe the errors into the stream, we will get unhandled exception errors in the console and won't be caught by any try/catch or .catch constructs that call createLazyDownloadStreamFrom since this initDownloadStream function is called in the callback function of .on('newListener') and thus isn't in the "call stack" of the call to createLazyDownloadStreamFrom
    // for example, it is totally reasonable that the s3 object asked for doesn't exist
    // in which case s3.send(new GetObjectCommand(/* */)) throws
    private async initDownloadStream(
        bucket: string,
        key: string,
        stream: PassThrough
    ) {
        try {
            const { Body: body } = await this.s3.send(
                new GetObjectCommand({ Bucket: bucket, Key: key })
            )
            // we need to type narrow here since Body can be one of many things
            if (!body) {
                stream.emit(
                    'error',
                    new Error(
                        `got an undefined body from s3 when getting object ${bucket}/${key}`
                    )
                )
            } else if (!('on' in body)) {
                stream.emit(
                    'error',
                    new Error(
                        `got a ReadableStream<any> (a stream used by browser fetch) or Blob from s3 when getting object ${bucket}/${key} instead of Readable`
                    )
                )
            } else {
                body.on('error', err => stream.emit('error', err)).pipe(stream)
            }
        } catch (e) {
            stream.emit('error', e)
        }
    }
}

Thank you so much ! I had an issue where my lambda would die on larger/multiple files without throwing any errors with Archiver and SDK v3, this was the fix. Great work !

@amiantos
Copy link
Author

Really happy to see how this gist has grown and evolved over the years. Kudos to everyone who has weighed in, offered more code and advice!

@julianpoma
Copy link

I was struggling with this today. Glad to know that I am not alone <3

@Sahar-SE
Copy link

Sahar-SE commented Oct 9, 2023

Hi, I need someone to help me please. I have created a Lambda function to trigger S3 bucket and zip the uploaded files then store them in a destination bucket. but when I upload files it doesn't appear to destination bucket. I have searched a lot but couldn't find any solution for that. I configured all the permission and IAM but still it doesn't work.

@pnicholls
Copy link

pnicholls commented Apr 23, 2024

zipkit.io is another way to solve this problem.

@marc-reed
Copy link

Really happy to see how this gist has grown and evolved over the years. Kudos to everyone who has weighed in, offered more code and advice!

I am going to join in and sing praises for @amiantos - you saved my butt today!
I had to write a Lambda to zip thousands of files. The task would just go dark - no zip, no errors. Your V3 script did the trick - well done!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment