Created
February 5, 2024 22:50
-
-
Save lancejpollard/9158415049896d0b96192d4c34761436 to your computer and use it in GitHub Desktop.
AWS Lambda file uploader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import _ from 'lodash' | |
import { ReadStream } from 'fs' | |
import busboy, { FileInfo } from 'busboy' | |
import DEBUG from 'debug' | |
import kink from './kink.js' | |
import { Event } from './router.js' | |
const debug = DEBUG('load-file') | |
export type HandleFileInput = { | |
event: Event | |
stream: ReadStream | |
file: LoadFile | |
} | |
export type LoadFile = { | |
name: string | |
encoding: string | |
mimeType: string | |
buffer?: Buffer | |
path?: string | |
} | |
export type LoadFileLink = { | |
path: string | |
min?: number | |
max?: number | |
} | |
export type HandleFile = ( | |
props: HandleFileInput, | |
) => Promise<string | Buffer> | |
export default function loadFiles( | |
event: Event, | |
{ | |
link, | |
hook, | |
halt, | |
}: { | |
hook: HandleFile | |
link: Array<LoadFileLink> | |
halt?: {} | |
}, | |
) { | |
return new Promise((res, rej) => { | |
let bb: busboy.Busboy | |
const fieldMap = link.reduce<Record<string, LoadFileLink>>( | |
(m, x) => { | |
m[x.path] = x | |
return m | |
}, | |
{}, | |
) | |
event.json ??= {} | |
bb = busboy({ | |
headers: event.headers, | |
limits: halt, | |
}) | |
let uploadError | |
let processing = 0 | |
// handle text field data | |
bb.on( | |
'field', | |
( | |
fieldName: string, | |
value: string, | |
fieldNameTruncated: boolean, | |
valueTruncated: boolean, | |
) => { | |
if (fieldName == null) { | |
return abort(kink('missing_field_name')) | |
} | |
if (fieldNameTruncated) { | |
return abort(kink('field_name_truncated')) | |
} | |
if (valueTruncated) { | |
return abort( | |
kink('field_value_truncated', { name: fieldName }), | |
) | |
} | |
_.set(event.json, fieldName, value) | |
}, | |
) | |
let isDone = false | |
let readFinished = false | |
// handle files | |
const files: Record<string, LoadFile> = {} | |
const sizes: Record<string, number> = link.reduce< | |
Record<string, number> | |
>((m, x) => { | |
m[x.path] = 0 | |
return m | |
}, {}) | |
const streams: Record<string, ReadStream> = {} | |
const aborted: Record<string, boolean> = {} | |
bb.on( | |
'file', | |
async (fieldName: string, stream: ReadStream, info: FileInfo) => { | |
if (!fieldMap[fieldName]) { | |
return abort( | |
kink('invalid_file_field_name', { | |
name: fieldName, | |
}), | |
) | |
} | |
sizes[fieldName]++ | |
const field = fieldMap[fieldName] | |
const size = sizes[fieldName] | |
if (field?.max && size && size > field.max) { | |
return abort(kink('too_many_files')) | |
} | |
processing++ | |
const file: LoadFile = (files[fieldName] = { | |
name: info.filename, | |
encoding: info.encoding, | |
mimeType: info.mimeType, | |
}) | |
streams[fieldName] = stream | |
const handleClear = () => { | |
debug('handle upload stream clear') | |
stream.off('clear', handleClear) | |
processing-- | |
process.nextTick(finish) | |
} | |
const handleError = err => { | |
debug('handle upload stream error') | |
// stream.off('error', handleError) | |
// handled in custom handler | |
aborted[fieldName] = true | |
stream.emit('abort', err) | |
abort(err) | |
} | |
const handleLimit = () => { | |
debug('handle upload stream limit') | |
stream.off('limit', handleLimit) | |
const err = kink('file_size_limit_reached') | |
aborted[fieldName] = true | |
stream.emit('abort', err) | |
abort(err) | |
} | |
// emit in custom handler. | |
stream.on('clear', handleClear) | |
stream.on('error', handleError) | |
stream.on('limit', handleLimit) | |
try { | |
debug('before upload handling') | |
const data = await hook({ event, file, stream }) | |
debug('after upload handling') | |
if (data instanceof Buffer) { | |
file.buffer = data | |
} else if (typeof data === 'string') { | |
file.path = data | |
} | |
_.set(event.json, fieldName, file) | |
stream.emit('clear') | |
} catch (e) { | |
debug('error uploading') | |
return abort(e) | |
} | |
}, | |
) | |
bb.on('error', err => { | |
abort(err) | |
}) | |
bb.on('partsLimit', () => { | |
// abort(kink('LIMIT_PART_COUNT')) | |
}) | |
bb.on('filesLimit', () => { | |
// abort(kink('LIMIT_FILE_COUNT')) | |
}) | |
bb.on('fieldsLimit', () => { | |
// abort(kink('LIMIT_FIELD_COUNT')) | |
}) | |
bb.on('finish', () => { | |
readFinished = true | |
finish() | |
}) | |
bb.write(event.body, 'binary') | |
bb.end() | |
function abort(err) { | |
if (uploadError) { | |
return | |
} | |
debug('stream abort') | |
uploadError = err | |
for (const fieldName in streams) { | |
const stream = streams[fieldName] | |
if (stream && !aborted[fieldName]) { | |
aborted[fieldName] = true | |
stream.emit('abort', err) | |
} | |
} | |
} | |
function finalize(err: Error | null) { | |
if (isDone) { | |
return | |
} | |
isDone = true | |
// debug('upload finalize') | |
bb.removeAllListeners() | |
if (err) { | |
return rej(err) | |
} | |
res(undefined) | |
} | |
function finish() { | |
debug('upload finish') | |
if ((uploadError || readFinished) && !processing) { | |
finalize(uploadError) | |
} | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment