Created
August 10, 2014 17:54
-
-
Save myndzi/27ae4173bd6dd27aa4fe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var Promise = require('bluebird'), | |
Deque = require('double-ended-queue'); | |
module.exports = Queue; | |
function Queue(opts) { | |
opts = opts || { }; | |
var delay = parseInt(opts.delay, 10), | |
numThreads = parseInt(opts.threads || opts.numThreads, 10); | |
this.delay = isNaN(delay) ? Queue.defaults.delay : delay; | |
this.numThreads = isNaN(numThreads) || numThreads < 1 ? Queue.defaults.numThreads : numThreads; | |
this._running = false; | |
this._seen = { }; | |
this._queue = new Deque(100); | |
this._deferred = new Deque(this.numThreads); | |
} | |
Queue.defaults = { | |
delay: 1000, | |
numThreads: 1 | |
}; | |
Queue.prototype.push = function (item) { | |
if (item in this._seen) { return; } | |
this._seen[item] = 1; | |
if (this._deferred.length) { | |
this._deferred.shift().resolve(item); | |
} else { | |
this._queue.push(item); | |
} | |
}; | |
Queue.prototype.process = function (fn) { | |
if (this._running) { | |
return Promise.reject(new Error('Queue is already running')); | |
} | |
if (typeof fn !== 'function') { | |
return Promise.reject(new Error('Queue.process(): Must supply a handler')); | |
} | |
var self = this, | |
queue = self._queue, | |
delay = self.delay, | |
handle = Promise.method(fn); | |
self._running = true; | |
var item, deferred; | |
return Promise.map(new Array(self.numThreads), function next() { | |
if (( item = queue.shift() )) { | |
return handle(item) | |
.delay(delay) | |
.then(next); | |
} else if (self._running) { | |
deferred = Promise.defer(); | |
self._deferred.push(deferred); | |
return deferred.promise | |
.then(handle) | |
.delay(delay) | |
.then(next); | |
} | |
}); | |
}; | |
Queue.prototype.done = function () { | |
this._running = false; | |
var d, deque = this._deferred; | |
while (( d = deque.shift() )) { d.resolve(); } | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment