Skip to content

Instantly share code, notes, and snippets.

@myndzi
Created August 10, 2014 17:54
Show Gist options
  • Save myndzi/27ae4173bd6dd27aa4fe to your computer and use it in GitHub Desktop.
Save myndzi/27ae4173bd6dd27aa4fe to your computer and use it in GitHub Desktop.
'use strict';
var Promise = require('bluebird'),
Deque = require('double-ended-queue');
module.exports = Queue;
function Queue(opts) {
opts = opts || { };
var delay = parseInt(opts.delay, 10),
numThreads = parseInt(opts.threads || opts.numThreads, 10);
this.delay = isNaN(delay) ? Queue.defaults.delay : delay;
this.numThreads = isNaN(numThreads) || numThreads < 1 ? Queue.defaults.numThreads : numThreads;
this._running = false;
this._seen = { };
this._queue = new Deque(100);
this._deferred = new Deque(this.numThreads);
}
Queue.defaults = {
delay: 1000,
numThreads: 1
};
Queue.prototype.push = function (item) {
if (item in this._seen) { return; }
this._seen[item] = 1;
if (this._deferred.length) {
this._deferred.shift().resolve(item);
} else {
this._queue.push(item);
}
};
Queue.prototype.process = function (fn) {
if (this._running) {
return Promise.reject(new Error('Queue is already running'));
}
if (typeof fn !== 'function') {
return Promise.reject(new Error('Queue.process(): Must supply a handler'));
}
var self = this,
queue = self._queue,
delay = self.delay,
handle = Promise.method(fn);
self._running = true;
var item, deferred;
return Promise.map(new Array(self.numThreads), function next() {
if (( item = queue.shift() )) {
return handle(item)
.delay(delay)
.then(next);
} else if (self._running) {
deferred = Promise.defer();
self._deferred.push(deferred);
return deferred.promise
.then(handle)
.delay(delay)
.then(next);
}
});
};
Queue.prototype.done = function () {
this._running = false;
var d, deque = this._deferred;
while (( d = deque.shift() )) { d.resolve(); }
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment