-
-
Save ptgamr/4d1d07b770321a138c91 to your computer and use it in GitHub Desktop.
'use strict'; | |
const fs = Promise.promisifyAll(require('fs')); | |
const path = require('path'); | |
const crypto = require('crypto'); | |
const CronJob = require('cron').CronJob; | |
module.exports = class FlowUploader { | |
constructor(tempDir, uploadDir, maxFileSize, fileParameterName) { | |
this.tempDir = tempDir || './tmp'; | |
this.uploadDir = uploadDir || './uploads'; | |
this.maxFileSize = maxFileSize; | |
this.fileParameterName = fileParameterName || 'file'; | |
try { | |
fs.mkdirSync(this.tempDir); | |
} catch (e) {} | |
//run clean up job five minutes after midnight, every day | |
new CronJob('5 0 * * *', () => { this._cleanUnfinishedChunks(); }, null, true, 'Europe/Zurich'); | |
} | |
chunkExists(req) { | |
let chunkNumber = req.query.flowChunkNumber, | |
chunkSize = req.query.flowChunkSize, | |
totalSize = req.query.flowTotalSize, | |
identifier = req.query.flowIdentifier, | |
fileName = req.query.flowFilename; | |
let validation = this._isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName); | |
if (validation !== 'VALID') { | |
return Promise.reject(validation); | |
} | |
let chunkFilename = this._getChunkFilename(chunkNumber, identifier); | |
return fs.statAsync(chunkFilename); | |
} | |
saveChunk(req) { | |
let fields = req.body, | |
files = req.files; | |
let chunkNumber = Number(fields.flowChunkNumber), | |
chunkSize = Number(fields.flowChunkSize), | |
totalSize = Number(fields.flowTotalSize), | |
identifier = fields.flowIdentifier, | |
fileName = fields.flowFilename; | |
if (!files[this.fileParameterName] || !files[this.fileParameterName].size) { | |
return Promise.reject('INVALID_FLOW_REQUEST'); | |
} | |
let validation = this._isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName, files[this.fileParameterName].size); | |
if (validation !== 'VALID') { | |
return Promise.reject(validation); | |
} | |
let chunkFilename = this._getChunkFilename(chunkNumber, identifier); | |
return fs.renameAsync(files[this.fileParameterName].path, chunkFilename) | |
.then(() => { | |
let numberOfChunks = this._getNumberOfChunks(totalSize, chunkSize); | |
if (chunkNumber !== numberOfChunks) { | |
return 'PARTLY_DONE'; | |
} | |
let chunkFileNames = []; | |
for (let i = 1; i <= numberOfChunks; i++) { | |
chunkFileNames.push(this._getChunkFilename(i, identifier)); | |
} | |
return Promise.map( | |
chunkFileNames, | |
chunkFileName => fs.statAsync(chunkFileName), | |
{concurency: 2} | |
).then(() => this._writeToUploadDir(numberOfChunks, identifier, fileName)) | |
.then(filename => filename, () => 'ERROR_VERIFY_CHUNK'); | |
}); | |
} | |
download(fileName) { | |
let downloadPath = this._getDownloadPath(fileName); | |
return fs.statAsync(downloadPath).then(() => { | |
return fs.createReadStream(downloadPath); | |
}); | |
} | |
_isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName, fileSize) { | |
identifier = this._cleanIdentifier(identifier); | |
if (!chunkNumber || !chunkSize || !totalSize || !identifier || !fileName) { | |
return 'INVALID_FLOW_REQUEST'; | |
} | |
let numberOfChunks = this._getNumberOfChunks(totalSize, chunkSize); | |
if (chunkNumber > numberOfChunks) { | |
return 'INVALID_CHUNK_NUMBER'; | |
} | |
if (this.maxFileSize && totalSize > this.maxFileSize) { | |
return 'INVALID_FILE_SIZE'; | |
} | |
if (typeof fileSize !== 'undefined') { | |
if (chunkNumber < numberOfChunks && fileSize !== chunkSize) { | |
console.log('>>>>.', typeof fileSize, typeof chunkSize); | |
return 'INVALID_FILESIZE_CHUNKSIZE_MISMATCH'; | |
} | |
if (numberOfChunks > 1 && chunkNumber === numberOfChunks && fileSize !== ((totalSize % chunkSize) + parseInt(chunkSize))) { | |
return 'INVALID_LAST_CHUNK'; | |
} | |
if (numberOfChunks === 1 && fileSize !== totalSize) { | |
return 'INVALID_SINGLE_CHUNK'; | |
} | |
} | |
return 'VALID'; | |
} | |
_getNumberOfChunks(totalSize, chunkSize) { | |
return Math.max(Math.floor(totalSize/chunkSize), 1); | |
} | |
_cleanIdentifier(identifier) { | |
return identifier.replace(/[^0-9A-Za-z_-]/g, ''); | |
} | |
_getChunkFilename(chunkNumber, identifier) { | |
identifier = this._cleanIdentifier(identifier); | |
let hash = crypto.createHash('sha1').update(identifier).digest('hex'); | |
return path.resolve(this.tempDir, `./${identifier}-${hash}.${chunkNumber}`); | |
} | |
_getDownloadPath(fileName) { | |
return path.resolve(this.uploadDir, `./${fileName}`); | |
} | |
_writeToUploadDir(numberOfChunks, identifier, fileName) { | |
let hash = crypto.createHash('sha1').update(identifier).digest('hex'); | |
let writeDir = path.resolve(this.uploadDir, `./${identifier}-${hash}${path.extname(fileName)}`); | |
let writableStream = fs.createWriteStream(writeDir); | |
let chunkFileNames = []; | |
for (let i = 1; i <= numberOfChunks; i++) { | |
chunkFileNames.push(this._getChunkFilename(i, identifier)); | |
} | |
return Promise.each( | |
chunkFileNames, | |
chunkFileName => { | |
return new Promise(resolve => { | |
let sourceStream = fs.createReadStream(chunkFileName); | |
sourceStream.pipe(writableStream, { | |
end: false | |
}); | |
sourceStream.on('end', function() { | |
fs.unlink(chunkFileName); | |
resolve(); | |
}); | |
}); | |
} | |
).then(() => { | |
writableStream.end(); | |
return path.basename(writeDir); | |
}); | |
} | |
_cleanUnfinishedChunks() { | |
let now = new Date().getTime(); | |
let oneDay = 24 * 60 * 60 * 1000; | |
fs.readdirAsync(this.tempDir) | |
.map(fileName => { | |
let filePath = path.resolve(this.tempDir, `./${fileName}`); | |
return fs.statAsync(filePath).then(stat => { | |
return { | |
filePath: filePath, | |
stat: stat | |
}; | |
}); | |
}, {concurency: 2}) | |
.filter(fileStat => { | |
let modifiedTime = fileStat.stat.ctime.getTime(); | |
return (now - modifiedTime) >= oneDay; | |
}) | |
.each(fileStat => fs.unlinkAsync(fileStat.filePath)); | |
} | |
}; |
// Handle status checks on chunks through Flow.js | |
router.get('/flow', function(req, res){ | |
return uploader | |
.chunkExists(req) | |
.then( | |
() => res.status(200).send(), | |
() => res.status(204).send() | |
); | |
}); | |
// Handle uploads through Flow.js | |
router.post('/flow', multipartMiddleware, function(req, res) { | |
return uploader | |
.saveChunk(req) | |
.then( | |
status => res.status(200).send(status), | |
err => res.status(400).send(err) | |
); | |
}); |
@ptgamr, can you please provide a full example?
Thanks
Are you sure the Oops, I see this is perfectly valid bluebird functionality ;)_cleanUnfinishedChunks
method works? You're calling 'map' on a 'Promise' it seems. I'm assuming you intended to call fs.readdirSync
instead of fs.readdirAsync
?
and: concurency
(line 78) is spelled wrong: should be concurrency
(index):166 fileProgress FlowFile FlowChunk
(index):166 progress
(index):166 fileProgress FlowFile FlowChunk
(index):166 progress
(index):166 fileSuccess FlowFile_lastProgressCallback: 1467522223840_prevProgress: 1_prevUploadedSize: 11230731averageSpeed: 0bytes: nullchunks: Array[10]currentSpeed: 0error: falsefile: FileflowObj: Flowname: "Jess....: Object ERROR_VERIFY_CHUNK FlowChunk
(index):166 complete
im getting a specific ERROR_VERIFY_CHUNK
on the FlowFile_lastProgressCallback
call. from the upload dir i see that the files peices are still present and not combined in any way... anything im doing wrong? i used this code on the server, no changes
@LaurensRietveld do is this working for you?
//...
let chunkFileNames = [];
for (let i = 1; i <= numberOfChunks; i++) {
chunkFileNames.push(this._getChunkFilename(i, identifier));
}
return Promise.map(
chunkFileNames,
chunkFileName => fs.statAsync(chunkFileName),
{concurrency: 2}
).then(() => this._writeToUploadDir(numberOfChunks, identifier, fileName))
.then(filename => filename, () => 'ERROR_VERIFY_CHUNK');
//...
mine is throwing that error
Aaaaaaaaaaaaaah... totally ok, missing uploads
dir, fixed and its all working magic
Hey pal,
I'm trying to use your code to upload a file but every time I try, I got an error 400. I've add some console.log and it's appear the "renameAsync" function is never call. Any thoughts ?