Skip to content

Instantly share code, notes, and snippets.

@ptgamr
Last active September 17, 2019 13:38
Show Gist options
  • Save ptgamr/4d1d07b770321a138c91 to your computer and use it in GitHub Desktop.
Save ptgamr/4d1d07b770321a138c91 to your computer and use it in GitHub Desktop.
Flowjs, ResumableJs NodeJS backend
'use strict';
const fs = Promise.promisifyAll(require('fs'));
const path = require('path');
const crypto = require('crypto');
const CronJob = require('cron').CronJob;
module.exports = class FlowUploader {
constructor(tempDir, uploadDir, maxFileSize, fileParameterName) {
this.tempDir = tempDir || './tmp';
this.uploadDir = uploadDir || './uploads';
this.maxFileSize = maxFileSize;
this.fileParameterName = fileParameterName || 'file';
try {
fs.mkdirSync(this.tempDir);
} catch (e) {}
//run clean up job five minutes after midnight, every day
new CronJob('5 0 * * *', () => { this._cleanUnfinishedChunks(); }, null, true, 'Europe/Zurich');
}
chunkExists(req) {
let chunkNumber = req.query.flowChunkNumber,
chunkSize = req.query.flowChunkSize,
totalSize = req.query.flowTotalSize,
identifier = req.query.flowIdentifier,
fileName = req.query.flowFilename;
let validation = this._isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName);
if (validation !== 'VALID') {
return Promise.reject(validation);
}
let chunkFilename = this._getChunkFilename(chunkNumber, identifier);
return fs.statAsync(chunkFilename);
}
saveChunk(req) {
let fields = req.body,
files = req.files;
let chunkNumber = Number(fields.flowChunkNumber),
chunkSize = Number(fields.flowChunkSize),
totalSize = Number(fields.flowTotalSize),
identifier = fields.flowIdentifier,
fileName = fields.flowFilename;
if (!files[this.fileParameterName] || !files[this.fileParameterName].size) {
return Promise.reject('INVALID_FLOW_REQUEST');
}
let validation = this._isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName, files[this.fileParameterName].size);
if (validation !== 'VALID') {
return Promise.reject(validation);
}
let chunkFilename = this._getChunkFilename(chunkNumber, identifier);
return fs.renameAsync(files[this.fileParameterName].path, chunkFilename)
.then(() => {
let numberOfChunks = this._getNumberOfChunks(totalSize, chunkSize);
if (chunkNumber !== numberOfChunks) {
return 'PARTLY_DONE';
}
let chunkFileNames = [];
for (let i = 1; i <= numberOfChunks; i++) {
chunkFileNames.push(this._getChunkFilename(i, identifier));
}
return Promise.map(
chunkFileNames,
chunkFileName => fs.statAsync(chunkFileName),
{concurency: 2}
).then(() => this._writeToUploadDir(numberOfChunks, identifier, fileName))
.then(filename => filename, () => 'ERROR_VERIFY_CHUNK');
});
}
download(fileName) {
let downloadPath = this._getDownloadPath(fileName);
return fs.statAsync(downloadPath).then(() => {
return fs.createReadStream(downloadPath);
});
}
_isValidRequest(chunkNumber, chunkSize, totalSize, identifier, fileName, fileSize) {
identifier = this._cleanIdentifier(identifier);
if (!chunkNumber || !chunkSize || !totalSize || !identifier || !fileName) {
return 'INVALID_FLOW_REQUEST';
}
let numberOfChunks = this._getNumberOfChunks(totalSize, chunkSize);
if (chunkNumber > numberOfChunks) {
return 'INVALID_CHUNK_NUMBER';
}
if (this.maxFileSize && totalSize > this.maxFileSize) {
return 'INVALID_FILE_SIZE';
}
if (typeof fileSize !== 'undefined') {
if (chunkNumber < numberOfChunks && fileSize !== chunkSize) {
console.log('>>>>.', typeof fileSize, typeof chunkSize);
return 'INVALID_FILESIZE_CHUNKSIZE_MISMATCH';
}
if (numberOfChunks > 1 && chunkNumber === numberOfChunks && fileSize !== ((totalSize % chunkSize) + parseInt(chunkSize))) {
return 'INVALID_LAST_CHUNK';
}
if (numberOfChunks === 1 && fileSize !== totalSize) {
return 'INVALID_SINGLE_CHUNK';
}
}
return 'VALID';
}
_getNumberOfChunks(totalSize, chunkSize) {
return Math.max(Math.floor(totalSize/chunkSize), 1);
}
_cleanIdentifier(identifier) {
return identifier.replace(/[^0-9A-Za-z_-]/g, '');
}
_getChunkFilename(chunkNumber, identifier) {
identifier = this._cleanIdentifier(identifier);
let hash = crypto.createHash('sha1').update(identifier).digest('hex');
return path.resolve(this.tempDir, `./${identifier}-${hash}.${chunkNumber}`);
}
_getDownloadPath(fileName) {
return path.resolve(this.uploadDir, `./${fileName}`);
}
_writeToUploadDir(numberOfChunks, identifier, fileName) {
let hash = crypto.createHash('sha1').update(identifier).digest('hex');
let writeDir = path.resolve(this.uploadDir, `./${identifier}-${hash}${path.extname(fileName)}`);
let writableStream = fs.createWriteStream(writeDir);
let chunkFileNames = [];
for (let i = 1; i <= numberOfChunks; i++) {
chunkFileNames.push(this._getChunkFilename(i, identifier));
}
return Promise.each(
chunkFileNames,
chunkFileName => {
return new Promise(resolve => {
let sourceStream = fs.createReadStream(chunkFileName);
sourceStream.pipe(writableStream, {
end: false
});
sourceStream.on('end', function() {
fs.unlink(chunkFileName);
resolve();
});
});
}
).then(() => {
writableStream.end();
return path.basename(writeDir);
});
}
_cleanUnfinishedChunks() {
let now = new Date().getTime();
let oneDay = 24 * 60 * 60 * 1000;
fs.readdirAsync(this.tempDir)
.map(fileName => {
let filePath = path.resolve(this.tempDir, `./${fileName}`);
return fs.statAsync(filePath).then(stat => {
return {
filePath: filePath,
stat: stat
};
});
}, {concurency: 2})
.filter(fileStat => {
let modifiedTime = fileStat.stat.ctime.getTime();
return (now - modifiedTime) >= oneDay;
})
.each(fileStat => fs.unlinkAsync(fileStat.filePath));
}
};
// Handle status checks on chunks through Flow.js
router.get('/flow', function(req, res){
return uploader
.chunkExists(req)
.then(
() => res.status(200).send(),
() => res.status(204).send()
);
});
// Handle uploads through Flow.js
router.post('/flow', multipartMiddleware, function(req, res) {
return uploader
.saveChunk(req)
.then(
status => res.status(200).send(status),
err => res.status(400).send(err)
);
});
@PanzerKadaver
Copy link

Hey pal,

I'm trying to use your code to upload a file but every time I try, I got an error 400. I've add some console.log and it's appear the "renameAsync" function is never call. Any thoughts ?

@tpiros
Copy link

tpiros commented Dec 12, 2015

@ptgamr, can you please provide a full example?

Thanks

@LaurensRietveld
Copy link

LaurensRietveld commented Jun 28, 2016

Are you sure the _cleanUnfinishedChunks method works? You're calling 'map' on a 'Promise' it seems. I'm assuming you intended to call fs.readdirSync instead of fs.readdirAsync? Oops, I see this is perfectly valid bluebird functionality ;)

and: concurency (line 78) is spelled wrong: should be concurrency

@BransonGitomeh
Copy link

BransonGitomeh commented Jul 3, 2016


(index):166 fileProgress FlowFile FlowChunk
(index):166 progress
(index):166 fileProgress FlowFile FlowChunk
(index):166 progress
(index):166 fileSuccess FlowFile_lastProgressCallback: 1467522223840_prevProgress: 1_prevUploadedSize: 11230731averageSpeed: 0bytes: nullchunks: Array[10]currentSpeed: 0error: falsefile: FileflowObj: Flowname: "Jess....: Object ERROR_VERIFY_CHUNK FlowChunk
(index):166 complete

im getting a specific ERROR_VERIFY_CHUNK on the FlowFile_lastProgressCallback call. from the upload dir i see that the files peices are still present and not combined in any way... anything im doing wrong? i used this code on the server, no changes

@BransonGitomeh
Copy link

@LaurensRietveld do is this working for you?

//...
                    let chunkFileNames = [];
                    for (let i = 1; i <= numberOfChunks; i++) {
                        chunkFileNames.push(this._getChunkFilename(i, identifier));
                    }

                    return Promise.map(
                        chunkFileNames,
                        chunkFileName => fs.statAsync(chunkFileName),
                        {concurrency: 2}
                    ).then(() => this._writeToUploadDir(numberOfChunks, identifier, fileName))
                    .then(filename => filename, () => 'ERROR_VERIFY_CHUNK');
//...

mine is throwing that error

@BransonGitomeh
Copy link

Aaaaaaaaaaaaaah... totally ok, missing uploads dir, fixed and its all working magic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment