Created
November 7, 2022 20:09
-
-
Save weeksie/a93adcdba5f77b2897928188b38a3c30 to your computer and use it in GitHub Desktop.
A simple
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import stream from 'stream'; | |
import { | |
S3Client, | |
CreateMultipartUploadCommand, | |
UploadPartCommand, | |
CreateMultipartUploadCommandOutput, | |
CompleteMultipartUploadCommand, | |
CompletedPart, | |
DeleteObjectCommand, | |
AbortMultipartUploadCommand, | |
} from '@aws-sdk/client-s3'; | |
import { | |
assertEnv, | |
} from '@melty/app'; | |
const { | |
S3_ACCESS_KEY, | |
S3_SECRET_KEY, | |
} = | |
assertEnv( | |
process.env, | |
'S3_ACCESS_KEY', | |
'S3_SECRET_KEY' | |
); | |
const client = new S3Client({ | |
region: 'us-west-2', | |
credentials: { | |
accessKeyId: S3_ACCESS_KEY, | |
secretAccessKey: S3_SECRET_KEY, | |
}, | |
}); | |
const AWS_PREFIX = `https://melty-uploads.s3.us-west-2.amazonaws.com/`; | |
const MIN_CHUNK_SIZE = 1024 * 1024 * 5; // 5mb | |
const Bucket = 'melty-uploads'; | |
type S3WritableStreamProps = { | |
fileName: string; | |
}; | |
export class S3WritableStream extends stream.Writable { | |
buffered: number; | |
encoding: BufferEncoding; | |
s3Parts: Promise<CompletedPart>[]; | |
client: S3Client; | |
chunks: Buffer[]; | |
started: boolean; | |
/** | |
The initial upload acknowledgement. If a file is very small we get into a race | |
condition where the upload might not have started yet and we're still waiting | |
for the upload id. This lets the `finishUpload` method wait until this is settled. | |
*/ | |
ack: Promise<CreateMultipartUploadCommandOutput>; | |
fileName: string; | |
uploadId?: string; | |
constructor({ | |
fileName, | |
}: S3WritableStreamProps) { | |
super({ | |
//defaultEncoding: 'latin1', | |
}); | |
this.started = false; | |
this.s3Parts = []; | |
this.chunks = []; | |
this.buffered = 0; | |
this.fileName = fileName; | |
this.client = new S3Client({ | |
region: 'us-west-2', | |
credentials: { | |
accessKeyId: S3_ACCESS_KEY, | |
secretAccessKey: S3_SECRET_KEY, | |
} | |
}); | |
} | |
onError = (e: Error) => { | |
this.emit('error', e); | |
} | |
get key(): string { | |
return `${this.fileName}`; | |
} | |
_write(chunk: string, encoding: BufferEncoding, next: () => void) { | |
const buffer: Buffer = Buffer.from(chunk, encoding); | |
this.chunks.push(buffer); | |
this.buffered += chunk.length; | |
if (!this.started) { | |
this.started = true; | |
this.emit('start'); | |
this.startUpload().then(() => next()); | |
return; | |
} | |
this.emit('progress', chunk.length, 0); | |
if (this.buffered < MIN_CHUNK_SIZE) { | |
next(); | |
return; | |
} | |
this.sendPart().then(() => next()); | |
} | |
end(next: () => void) { | |
this.finishUpload().then(() => next()); | |
return this; | |
} | |
async startUpload() { | |
const start = new CreateMultipartUploadCommand({ | |
Key: this.key, | |
ACL: 'public-read', | |
Bucket, | |
}); | |
this.ack = client.send(start); | |
this.uploadId = (await this.ack).UploadId; | |
} | |
async finishUpload() { | |
// If a very small file has been uploaded we could get here before the | |
// ack is recieved. If that's the case we need to wait for the ack | |
// to resolve, otherwise there's no uploadId. | |
await this.ack; | |
if (this.chunks.length) { | |
this.sendPart(); | |
} | |
const Parts = await Promise.all(this.s3Parts); | |
const finished = new CompleteMultipartUploadCommand({ | |
Bucket, | |
UploadId: this.uploadId, | |
Key: this.key, | |
MultipartUpload: { Parts }, | |
}); | |
await this.client.send(finished); | |
this.emit('done', `${AWS_PREFIX}${this.key}`); | |
} | |
async sendPart() { | |
const part = new UploadPartCommand({ | |
Bucket, | |
Key: this.key, | |
UploadId: this.uploadId, | |
PartNumber: this.s3Parts.length + 1, | |
Body: Buffer.concat(this.chunks), | |
}); | |
this.s3Parts.push( | |
this.client.send(part).then( | |
(response) => { | |
if (typeof part.input.Body === 'string') { | |
this.emit('progress', 0, part.input.Body.length); | |
} | |
return { | |
ETag: response.ETag, | |
PartNumber: part.input.PartNumber, | |
} | |
} | |
) | |
); | |
this.chunks = []; | |
this.buffered = 0; | |
} | |
cancelUpload() { | |
if (!this.uploadId) { | |
return; | |
} | |
return client.send(new AbortMultipartUploadCommand({ | |
Bucket, | |
Key: this.key, | |
UploadId: this.uploadId, | |
})); | |
} | |
}; | |
export const remove = async (src: string) => { | |
const Bucket = 'melty-uploads'; | |
const Key = src.replace(AWS_PREFIX, '').replace(/^\//, ''); | |
await client.send(new DeleteObjectCommand({ | |
Bucket, | |
Key, | |
})); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment