Created
June 26, 2014 18:24
-
-
Save maruf89/4865fd0e78ce34e28e13 to your computer and use it in GitHub Desktop.
Assembles upload chunks into a single file. Originally built for Flow.js using Nodejs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Usage: Create a new combiner instance for each new object that needs assembling | |
# And call #add to add each new chunk | |
# | |
# combiner = combiners[identifier] or new Combiner( | |
# numberOfChunks: numberOfChunks | |
# filename: filename | |
# identifier: identifier | |
# directory: @temporaryFolder | |
# callback: -> | |
# # destroy reference as it's no longer needed | |
# combiners[identifier] = null | |
# callback.apply(null, arguments) | |
# ) | |
# combiners[identifier] = combiner | |
# | |
# if combiner.add(files['file'].path, chunkNumber) | |
# res.send(200) | |
###* | |
* Assembles chunks into a final file | |
* @type {[type]} | |
### | |
exports.Combiner = class Combiner extends EventEmitter | |
chunks = null | |
self = null | |
requiredParams = [ | |
'numberOfChunks' | |
'identifier' | |
'directory' | |
] | |
downloadExtension = '.cfdownload' | |
###* | |
* Accepts options | |
* | |
* @param {Object} data has 3 required properties listed below | |
* @options data {Integer} numberOfChunks total number of chunks | |
* @options data {String} identifier file identifier | |
# @options data {String} directory path of the temp files | |
# @options data {Function} callback optional callback when file is combined | |
### | |
constructor: (data = {}, _undefined) -> | |
# Iterate over the required params and throw error if any are missing | |
for param in requiredParams | |
throw new VError('Missing `numberOfChunks` parameter') if data[param] is _undefined | |
self = @ | |
_.each data, (val, key) -> | |
self[key] = val | |
return | |
chunks = new Array(data.numberOfChunks) | |
# Determine the file extension from the passed in filename | |
@fileExtension = path.extname(@filename) | |
@idle = true | |
@downloadName = "#{@directory}/#{@identifier}#{downloadExtension}" | |
@filename = "#{@directory}/#{@identifier}#{@fileExtension}" | |
@currentIndex = 0 | |
# If file exists from previously, remove it | |
fs.exists @filename, (exists) => | |
@idle = not exists | |
if exists | |
fs.unlink @filename, => | |
@emit('resume') | |
# Bind the internal callbacks | |
self.on('idle', @onIdle) | |
self.on('written', @onWritten) | |
self.on('resume', @onResume) | |
self.on('error', @onError) | |
# setup callback if one is passed in | |
if @callback | |
self.on('done', @callback) | |
###* | |
* File has been uploaded and ready to append | |
* | |
* @param {String} file path of the temp file | |
* @param {Integer} index chunk number | |
* @return {Boolean} whether the chunk is NOT the last chunk | |
### | |
add: (filePath, index) -> | |
# index is not 0 based, let's make it so | |
arrIndex = index - 1 | |
# Only write if we're idle, and this chunk is ready | |
if @idle and arrIndex is @currentIndex | |
@write(filePath) | |
else | |
chunks[arrIndex] = filePath | |
return index isnt @numberOfChunks | |
chunkReady: (index = @currentIndex) -> | |
return !!chunks[index] | |
###* | |
* Proceeds to write/append the next file chunk to the final file | |
* | |
* @param {String} file | |
* @return {[type]} [description] | |
### | |
write: (filePath, index = @currentIndex) -> | |
# exit early if no file sent | |
if !filePath | |
@emit('resume') | |
return false | |
writeStream = @getStream() | |
@idle = false | |
self = this | |
isEnd = index is @numberOfChunks | |
readStream = fs.createReadStream filePath, | |
flags: "r" | |
encoding: null | |
fd: null | |
mode: '0666' | |
# When done remove the file and emit it's done writing | |
readStream.on 'end', -> | |
self.emit('written', filePath) | |
readStream.on 'error', (err) -> | |
self.emit('error', err) | |
# append the guy | |
readStream.pipe(writeStream, {end: isEnd}) | |
###* | |
* Returns a new write stream if one doesn't already exist | |
* | |
* @param {String} file the final file path | |
* @return {Stream} write stream for the final file | |
### | |
getStream: -> | |
return @writeStream if @writeStream | |
return @writeStream = fs.createWriteStream @downloadName, | |
flags: 'w' | |
encoding: null | |
mode: '0666' | |
###* | |
* Called after onWritten, and waiting for chunks to finish | |
### | |
onIdle: -> | |
@idle = true | |
###* | |
* Called after file has been appended | |
* Remove the temp file and continue | |
* | |
* @callback | |
* @param {String} filePath the path of the file that was just read from | |
### | |
onWritten: (filePath) -> | |
fs.unlink filePath, => | |
chunks[@currentIndex++] = null | |
if @currentIndex is @numberOfChunks | |
return @onDone() | |
@emit('resume') | |
###* | |
* Called when done with one action, and ready to move on | |
* @return {[type]} [description] | |
### | |
onResume: -> | |
if @chunkReady() | |
@write(chunks[@currentIndex]) | |
else | |
@emit('idle') | |
###* | |
* If any errors are thrown, clean up and throw error | |
* | |
* @param {Error} err | |
### | |
onError: (err) -> | |
message = "error uploading #{@filename}" | |
error = new VError(err, message) | |
console.log(error) | |
emitErr = -> | |
self.emit 'done', | |
filename: null | |
response: 500 | |
success: false | |
message: message | |
error: error | |
# Remove temp download file if exists | |
fs.exists @downloadName, (exists) -> | |
if exists | |
fs.unlink(self.downloadName, emitErr) | |
else emitErr() | |
###* | |
* Called after the last chunk has been appended | |
### | |
onDone: -> | |
fs.rename @downloadName, @filename, => | |
return @emit('done', | |
filename: @filename | |
response: 200 | |
success: true | |
message: 'successfully uploaded!' | |
error: null | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment