Created
April 10, 2021 03:32
-
-
Save kbanman/0aa36ffe415cdc6c44293bc3ddb6448e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const uploader = new S3UploadStream(s3, { | |
Bucket: bucketName, | |
ContentType: contentType, | |
Key: key, | |
}) | |
stream.pipe(uploader) | |
return new Promise((resolve, reject) => { | |
uploader.on('uploadFinished', (result: CompleteMultipartUploadCommandOutput, fileSizeBytes: number) => { | |
resolve() | |
}) | |
uploader.on('error', err => { | |
reject(err) | |
}) | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
CompleteMultipartUploadCommand, | |
CompleteMultipartUploadCommandOutput, | |
CreateMultipartUploadCommand, | |
CreateMultipartUploadCommandInput, | |
S3Client, | |
UploadPartCommand, | |
} from '@aws-sdk/client-s3' | |
import {Writable} from 'stream' | |
// S3 requires all but the last part to be at least 5MB | |
const BUFFER_SIZE = 5242880 | |
export class S3UploadStream extends Writable { | |
private buffer = [] | |
private bufferLength = 0 | |
private totalLength = 0 | |
private started = false | |
private promiseQueue: Promise<void> | |
private uploadId: string | |
private uploadedParts: { ETag: string, PartNumber: number }[] = [] | |
private numParts = 0 | |
constructor(private s3: S3Client, | |
private options: CreateMultipartUploadCommandInput) { | |
super() | |
} | |
async startUpload() { | |
try { | |
const {UploadId} = await this.s3.send(new CreateMultipartUploadCommand(this.options)) | |
this.uploadId = UploadId | |
} catch (err) { | |
throw new Error(`Failed to create S3 MPU: ${err}`) | |
} | |
} | |
ensureUploadStarted() { | |
if (this.started) { | |
return | |
} | |
this.started = true | |
this.promiseQueue = this.startUpload() | |
} | |
_write(chunk: any, encoding: BufferEncoding, callback: Callback) { | |
this.ensureUploadStarted() | |
this.buffer.push(chunk) | |
this.bufferLength += chunk.length | |
if (this.bufferLength >= BUFFER_SIZE) { | |
this.emitPart() | |
.then(() => callback()) | |
.catch(err => callback(err)) | |
return | |
} | |
callback() | |
} | |
_final(callback: Callback) { | |
this.emitPart() | |
.then(() => this.completeUpload()) | |
.then(output => this.emit('uploadFinished', output, this.totalLength)) | |
.then(() => callback()) | |
.catch(err => callback(err)) | |
} | |
async uploadPart(part: number, chunk: Buffer) { | |
await this.promiseQueue | |
const {ETag} = await this.s3.send(new UploadPartCommand({ | |
Bucket: this.options.Bucket, | |
Key: this.options.Key, | |
UploadId: this.uploadId, | |
PartNumber: part, | |
Body: chunk, | |
})) | |
this.uploadedParts.push({ | |
ETag: ETag, | |
PartNumber: part, | |
}) | |
} | |
private emitPart() { | |
if (!this.bufferLength) { | |
return | |
} | |
this.totalLength += this.bufferLength | |
const partNumber = ++this.numParts; | |
const partContent = Buffer.concat(this.buffer, this.bufferLength) | |
// Reset | |
this.buffer = [] | |
this.bufferLength = 0 | |
return this.uploadPart(partNumber, partContent) | |
} | |
private async completeUpload() { | |
await this.promiseQueue | |
let res: CompleteMultipartUploadCommandOutput | |
try { | |
res = await this.s3.send(new CompleteMultipartUploadCommand({ | |
Bucket: this.options.Bucket, | |
Key: this.options.Key, | |
MultipartUpload: { | |
Parts: this.uploadedParts, | |
}, | |
UploadId: this.uploadId, | |
})) | |
} catch (err) { | |
throw new Error(`CompleteMultipartUpload failed: ${err}`) | |
} | |
return res | |
} | |
} | |
type Callback = (error?: (Error | null)) => void |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment