Last active
September 20, 2022 09:08
-
-
Save LucasRoesler/6fa732186efa9f80eb9e868c7f1fec8a to your computer and use it in GitHub Desktop.
streaming chunked file upload to s3 in typescript
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { S3Client } from "@aws-sdk/client-s3"; | |
import { streamingWrite, Upload } from "./writer"; | |
const sleep = (ms: number) => { | |
return new Promise(resolve => setTimeout(resolve, ms)); | |
}; | |
// skip because it requires a real s3 bucket, this is still useful for testing locally. | |
describe.skip("streamingWriter", () => { | |
afterAll(async () => { | |
await new Promise<void>(resolve => setTimeout(() => resolve(), 500)); // avoid jest open handle error | |
}); | |
it("can write a stream to S3", async () => { | |
const client = new S3Client({ region: "eu-central-1" }); | |
let upload: Upload | undefined; | |
try { | |
upload = streamingWrite(client, "telekom-mapping-uploads-test", "/lucas/test/data.csv"); | |
} catch (err) { | |
console.error(err); | |
throw err; | |
} | |
expect(upload).toBeDefined(); | |
if (upload) { | |
expect(upload.stream.write("header-a,header-b,header-c\n")).toBeTruthy(); | |
expect(upload.stream.write("a,b,c\n")).toBeTruthy(); | |
expect(upload.stream.write("1,2,3\n")).toBeTruthy(); | |
await sleep(5000); | |
upload.end(); | |
} | |
}); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Upload as UploadCommand } from "@aws-sdk/lib-storage"; | |
import { S3Client } from "@aws-sdk/client-s3"; | |
import { PassThrough, Writable } from "stream"; | |
import pino from "pino"; | |
const logger = pino({ | |
name: process.env.SERVICE_NAME || "api", | |
level: process.env.LOG_LEVEL || "info", | |
formatters: { | |
level: label => { | |
return { level: label }; | |
}, | |
}, | |
}); | |
/** | |
* Upload is a Writable stream that uploads data to S3. | |
* | |
* To finish the upload you must call `end()` on the stream. | |
* | |
* If you need to cancel the upload, call `abort()`. | |
* | |
* Examples: | |
* | |
``` | |
const resp = pg.query("select * from table_foo") | |
const upload = streamingWrite(client, bucket, object) | |
try{ | |
for( const result of resp) { | |
upload.stream.write(asCSV(result)) | |
} | |
} catch (err) { | |
upload.abort() | |
throw err | |
} | |
upload.end() | |
``` | |
*/ | |
export interface Upload { | |
stream: Writable; | |
abort: () => Promise<void>; | |
end: (cb?: () => void) => void; | |
} | |
/** | |
* streamingWrite is a function that creates and returns a writable stream that is then uploaded to s3 | |
* | |
* Examples: | |
* | |
``` | |
const resp = pg.query("select * from table_foo") | |
const upload = streamingWrite(client, bucket, object) | |
try{ | |
for( const result of resp) { | |
upload.stream.write(asCSV(result)) | |
} | |
} catch (err) { | |
upload.abort() | |
throw err | |
} | |
upload.end() | |
``` | |
*/ | |
export const streamingWrite = (client: S3Client, bucket: string, object: string): Upload => { | |
const stream = new PassThrough(); | |
// keys should not start with a slash | |
// otherwise it will create a folder | |
// named `/` in the bucket | |
let key = object; | |
if (object.startsWith("/")) { | |
key = object.substring(1); | |
} | |
const uploads = new UploadCommand({ | |
client: client, | |
params: { | |
Bucket: bucket, | |
Key: key, | |
Body: stream, | |
}, | |
queueSize: 4, // optional concurrency configuration | |
partSize: 1024 * 1024 * 5, // optional size of each part, in bytes, at least 5MB | |
leavePartsOnError: false, // optional manually handle dropped parts | |
}); | |
uploads | |
.done() | |
.catch(err => { | |
logger.error({ err }, "streamingWrite error uploading to s3"); | |
const code = err.Code || "unknown"; | |
const requestId = err.RequestId || "unknown"; | |
stream.destroy(Error(`streamingWrite error uploading to s3: ${code}, ${requestId}`)); | |
}) | |
.then(value => { | |
logger.info({ value }, "streamingWrite done uploading to s3"); | |
}); | |
uploads.on("httpUploadProgress", progress => { | |
logger.debug({ progress }, "streamingWrite progress"); | |
}); | |
return { stream, abort: uploads.abort, end: stream.end }; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment