Last active
September 17, 2019 13:38
-
-
Save ptgamr/4d1d07b770321a138c91 to your computer and use it in GitHub Desktop.
Flowjs, ResumableJs NodeJS backend
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const fs = Promise.promisifyAll(require('fs')); | |
const path = require('path'); | |
const crypto = require('crypto'); | |
const CronJob = require('cron').CronJob; | |
module.exports = class FlowUploader { | |
constructor(tempDir, uploadDir, maxFileSize, fileParameterName) { | |
this.tempDir = tempDir || './tmp'; | |
this.uploadDir = uploadDir || './uploads'; | |
this.maxFileSize = maxFileSize; | |
this.fileParameterName = fileParameterName || 'file'; | |
try { | |
fs.mkdirSync(this.tempDir); | |
} catch (e) {} | |
//run clean up job five minutes after midnight, every day | |
new CronJob('5 0 * * *', () => { this._cleanUnfinishedChunks(); }, null, true, 'Europe/Zurich'); | |
} | |
chunkExists(req) { | |
let chunkNumber = req.query.flowChunkNumber, | |
chunkSize = req.query.flowChunkSize, | |
totalSize = req.query.flowTotalSize, | |
identifier = req.query.flowIdentifier, | |
fileName = req.query.flowFilename; | |
let validation = this._isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName); | |
if (validation !== 'VALID') { | |
return Promise.reject(validation); | |
} | |
let chunkFilename = this._getChunkFilename(chunkNumber, identifier); | |
return fs.statAsync(chunkFilename); | |
} | |
saveChunk(req) { | |
let fields = req.body, | |
files = req.files; | |
let chunkNumber = Number(fields.flowChunkNumber), | |
chunkSize = Number(fields.flowChunkSize), | |
totalSize = Number(fields.flowTotalSize), | |
identifier = fields.flowIdentifier, | |
fileName = fields.flowFilename; | |
if (!files[this.fileParameterName] || !files[this.fileParameterName].size) { | |
return Promise.reject('INVALID_FLOW_REQUEST'); | |
} | |
let validation = this._isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName, files[this.fileParameterName].size); | |
if (validation !== 'VALID') { | |
return Promise.reject(validation); | |
} | |
let chunkFilename = this._getChunkFilename(chunkNumber, identifier); | |
return fs.renameAsync(files[this.fileParameterName].path, chunkFilename) | |
.then(() => { | |
let numberOfChunks = this._getNumberOfChunks(totalSize, chunkSize); | |
if (chunkNumber !== numberOfChunks) { | |
return 'PARTLY_DONE'; | |
} | |
let chunkFileNames = []; | |
for (let i = 1; i <= numberOfChunks; i++) { | |
chunkFileNames.push(this._getChunkFilename(i, identifier)); | |
} | |
return Promise.map( | |
chunkFileNames, | |
chunkFileName => fs.statAsync(chunkFileName), | |
{concurency: 2} | |
).then(() => this._writeToUploadDir(numberOfChunks, identifier, fileName)) | |
.then(filename => filename, () => 'ERROR_VERIFY_CHUNK'); | |
}); | |
} | |
download(fileName) { | |
let downloadPath = this._getDownloadPath(fileName); | |
return fs.statAsync(downloadPath).then(() => { | |
return fs.createReadStream(downloadPath); | |
}); | |
} | |
_isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName, fileSize) { | |
identifier = this._cleanIdentifier(identifier); | |
if (!chunkNumber || !chunkSize || !totalSize || !identifier || !fileName) { | |
return 'INVALID_FLOW_REQUEST'; | |
} | |
let numberOfChunks = this._getNumberOfChunks(totalSize, chunkSize); | |
if (chunkNumber > numberOfChunks) { | |
return 'INVALID_CHUNK_NUMBER'; | |
} | |
if (this.maxFileSize && totalSize > this.maxFileSize) { | |
return 'INVALID_FILE_SIZE'; | |
} | |
if (typeof fileSize !== 'undefined') { | |
if (chunkNumber < numberOfChunks && fileSize !== chunkSize) { | |
console.log('>>>>.', typeof fileSize, typeof chunkSize); | |
return 'INVALID_FILESIZE_CHUNKSIZE_MISMATCH'; | |
} | |
if (numberOfChunks > 1 && chunkNumber === numberOfChunks && fileSize !== ((totalSize % chunkSize) + parseInt(chunkSize))) { | |
return 'INVALID_LAST_CHUNK'; | |
} | |
if (numberOfChunks === 1 && fileSize !== totalSize) { | |
return 'INVALID_SINGLE_CHUNK'; | |
} | |
} | |
return 'VALID'; | |
} | |
_getNumberOfChunks(totalSize, chunkSize) { | |
return Math.max(Math.floor(totalSize/chunkSize), 1); | |
} | |
_cleanIdentifier(identifier) { | |
return identifier.replace(/[^0-9A-Za-z_-]/g, ''); | |
} | |
_getChunkFilename(chunkNumber, identifier) { | |
identifier = this._cleanIdentifier(identifier); | |
let hash = crypto.createHash('sha1').update(identifier).digest('hex'); | |
return path.resolve(this.tempDir, `./${identifier}-${hash}.${chunkNumber}`); | |
} | |
_getDownloadPath(fileName) { | |
return path.resolve(this.uploadDir, `./${fileName}`); | |
} | |
_writeToUploadDir(numberOfChunks, identifier, fileName) { | |
let hash = crypto.createHash('sha1').update(identifier).digest('hex'); | |
let writeDir = path.resolve(this.uploadDir, `./${identifier}-${hash}${path.extname(fileName)}`); | |
let writableStream = fs.createWriteStream(writeDir); | |
let chunkFileNames = []; | |
for (let i = 1; i <= numberOfChunks; i++) { | |
chunkFileNames.push(this._getChunkFilename(i, identifier)); | |
} | |
return Promise.each( | |
chunkFileNames, | |
chunkFileName => { | |
return new Promise(resolve => { | |
let sourceStream = fs.createReadStream(chunkFileName); | |
sourceStream.pipe(writableStream, { | |
end: false | |
}); | |
sourceStream.on('end', function() { | |
fs.unlink(chunkFileName); | |
resolve(); | |
}); | |
}); | |
} | |
).then(() => { | |
writableStream.end(); | |
return path.basename(writeDir); | |
}); | |
} | |
_cleanUnfinishedChunks() { | |
let now = new Date().getTime(); | |
let oneDay = 24 * 60 * 60 * 1000; | |
fs.readdirAsync(this.tempDir) | |
.map(fileName => { | |
let filePath = path.resolve(this.tempDir, `./${fileName}`); | |
return fs.statAsync(filePath).then(stat => { | |
return { | |
filePath: filePath, | |
stat: stat | |
}; | |
}); | |
}, {concurency: 2}) | |
.filter(fileStat => { | |
let modifiedTime = fileStat.stat.ctime.getTime(); | |
return (now - modifiedTime) >= oneDay; | |
}) | |
.each(fileStat => fs.unlinkAsync(fileStat.filePath)); | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Handle status checks on chunks through Flow.js | |
router.get('/flow', function(req, res){ | |
return uploader | |
.chunkExists(req) | |
.then( | |
() => res.status(200).send(), | |
() => res.status(204).send() | |
); | |
}); | |
// Handle uploads through Flow.js | |
router.post('/flow', multipartMiddleware, function(req, res) { | |
return uploader | |
.saveChunk(req) | |
.then( | |
status => res.status(200).send(status), | |
err => res.status(400).send(err) | |
); | |
}); |
Are you sure the Oops, I see this is perfectly valid bluebird functionality ;)_cleanUnfinishedChunks
method works? You're calling 'map' on a 'Promise' it seems. I'm assuming you intended to call fs.readdirSync
instead of fs.readdirAsync
?
and: concurency
(line 78) is spelled wrong: should be concurrency
(index):166 fileProgress FlowFile FlowChunk
(index):166 progress
(index):166 fileProgress FlowFile FlowChunk
(index):166 progress
(index):166 fileSuccess FlowFile_lastProgressCallback: 1467522223840_prevProgress: 1_prevUploadedSize: 11230731averageSpeed: 0bytes: nullchunks: Array[10]currentSpeed: 0error: falsefile: FileflowObj: Flowname: "Jess....: Object ERROR_VERIFY_CHUNK FlowChunk
(index):166 complete
im getting a specific ERROR_VERIFY_CHUNK
on the FlowFile_lastProgressCallback
call. from the upload dir i see that the files peices are still present and not combined in any way... anything im doing wrong? i used this code on the server, no changes
@LaurensRietveld do is this working for you?
//...
let chunkFileNames = [];
for (let i = 1; i <= numberOfChunks; i++) {
chunkFileNames.push(this._getChunkFilename(i, identifier));
}
return Promise.map(
chunkFileNames,
chunkFileName => fs.statAsync(chunkFileName),
{concurrency: 2}
).then(() => this._writeToUploadDir(numberOfChunks, identifier, fileName))
.then(filename => filename, () => 'ERROR_VERIFY_CHUNK');
//...
mine is throwing that error
Aaaaaaaaaaaaaah... totally ok, missing uploads
dir, fixed and its all working magic
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@ptgamr, can you please provide a full example?
Thanks