Skip to content

Instantly share code, notes, and snippets.

@LucasRoesler
Last active September 20, 2022 09:08
Show Gist options
  • Save LucasRoesler/6fa732186efa9f80eb9e868c7f1fec8a to your computer and use it in GitHub Desktop.
Save LucasRoesler/6fa732186efa9f80eb9e868c7f1fec8a to your computer and use it in GitHub Desktop.
streaming chunked file upload to s3 in typescript
import { S3Client } from "@aws-sdk/client-s3";
import { streamingWrite, Upload } from "./writer";
const sleep = (ms: number) => {
return new Promise(resolve => setTimeout(resolve, ms));
};
// skip because it requires a real s3 bucket, this is still useful for testing locally.
describe.skip("streamingWriter", () => {
afterAll(async () => {
await new Promise<void>(resolve => setTimeout(() => resolve(), 500)); // avoid jest open handle error
});
it("can write a stream to S3", async () => {
const client = new S3Client({ region: "eu-central-1" });
let upload: Upload | undefined;
try {
upload = streamingWrite(client, "telekom-mapping-uploads-test", "/lucas/test/data.csv");
} catch (err) {
console.error(err);
throw err;
}
expect(upload).toBeDefined();
if (upload) {
expect(upload.stream.write("header-a,header-b,header-c\n")).toBeTruthy();
expect(upload.stream.write("a,b,c\n")).toBeTruthy();
expect(upload.stream.write("1,2,3\n")).toBeTruthy();
await sleep(5000);
upload.end();
}
});
});
import { Upload as UploadCommand } from "@aws-sdk/lib-storage";
import { S3Client } from "@aws-sdk/client-s3";
import { PassThrough, Writable } from "stream";
import pino from "pino";
const logger = pino({
name: process.env.SERVICE_NAME || "api",
level: process.env.LOG_LEVEL || "info",
formatters: {
level: label => {
return { level: label };
},
},
});
/**
* Upload is a Writable stream that uploads data to S3.
*
* To finish the upload you must call `end()` on the stream.
*
* If you need to cancel the upload, call `abort()`.
*
* Examples:
*
```
const resp = pg.query("select * from table_foo")
const upload = streamingWrite(client, bucket, object)
try{
for( const result of resp) {
upload.stream.write(asCSV(result))
}
} catch (err) {
upload.abort()
throw err
}
upload.end()
```
*/
export interface Upload {
stream: Writable;
abort: () => Promise<void>;
end: (cb?: () => void) => void;
}
/**
* streamingWrite is a function that creates and returns a writable stream that is then uploaded to s3
*
* Examples:
*
```
const resp = pg.query("select * from table_foo")
const upload = streamingWrite(client, bucket, object)
try{
for( const result of resp) {
upload.stream.write(asCSV(result))
}
} catch (err) {
upload.abort()
throw err
}
upload.end()
```
*/
export const streamingWrite = (client: S3Client, bucket: string, object: string): Upload => {
const stream = new PassThrough();
// keys should not start with a slash
// otherwise it will create a folder
// named `/` in the bucket
let key = object;
if (object.startsWith("/")) {
key = object.substring(1);
}
const uploads = new UploadCommand({
client: client,
params: {
Bucket: bucket,
Key: key,
Body: stream,
},
queueSize: 4, // optional concurrency configuration
partSize: 1024 * 1024 * 5, // optional size of each part, in bytes, at least 5MB
leavePartsOnError: false, // optional manually handle dropped parts
});
uploads
.done()
.catch(err => {
logger.error({ err }, "streamingWrite error uploading to s3");
const code = err.Code || "unknown";
const requestId = err.RequestId || "unknown";
stream.destroy(Error(`streamingWrite error uploading to s3: ${code}, ${requestId}`));
})
.then(value => {
logger.info({ value }, "streamingWrite done uploading to s3");
});
uploads.on("httpUploadProgress", progress => {
logger.debug({ progress }, "streamingWrite progress");
});
return { stream, abort: uploads.abort, end: stream.end };
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment