Skip to content

Instantly share code, notes, and snippets.

@nulltask
Last active August 29, 2015 14:07
Show Gist options
  • Select an option

  • Save nulltask/8d4b1f9b8a3867d008fa to your computer and use it in GitHub Desktop.

Select an option

Save nulltask/8d4b1f9b8a3867d008fa to your computer and use it in GitHub Desktop.
simple queue library
/**
* Module dependencies.
*/
var assert = require('assert');
var Emitter = require('events').EventEmitter;
var Batch = require('batch');
var Backoff = require('backo');
/**
* Expose `Queue`.
*/
module.exports = Queue;
/**
* @param {Object} opts
*/
function Queue(opts) {
opts = opts || {};
this.fns = [];
this.state('paused');
this.dequeue = opts.dequeue;
this.backoff = new Backoff({ min: 10, max: 5000 });
}
/**
* Inherit from `EventEmitter`.
*/
Queue.prototype.__proto__ = Emitter.prototype;
/**
* @param {Function} fn
* @return {Queue}
*/
Queue.prototype.use = function(fn) {
this.fns.push(fn);
return this;
};
/**
* @param {String} state
* @param {Queue|String}
*/
Queue.prototype.state = function(state) {
if (!arguments.length) return this.currentState;
var prev = this.currentState;
if (prev == state) return this;
return this.emit('state', this.currentState = state, prev);
};
/**
* @return {Queue}
*/
Queue.prototype.next =
Queue.prototype.resume = function() {
var self = this;
assert(this.dequeue, 'dequeue function required.');
this.pause();
this.state('dequeueing');
this.timer = setTimeout(function() {
var callee = arguments.callee;
self.dequeue(function(err, result) {
if (err) console.error(err);
if (result) return self.process(result);
setTimeout(callee, self.backoff.duration());
})
}, this.backoff.duration());
return this;
};
/**
* @return {Queue}
*/
Queue.prototype.pause = function() {
clearTimeout(this.timer);
this.backoff.reset();
this.state('paused');
return this;
};
/**
* @param {Object} context
* @return {Queue}
*/
Queue.prototype.process = function(context) {
var self = this;
this.pause();
this.state('processing');
this.emit('process', context);
this.createBatch(context).end(function() {
self.emit('done', context);
self.resume();
});
return this;
};
/**
* @param {Object} context
* @return {Batch}
*/
Queue.prototype.createBatch = function(context) {
var batch = new Batch();
batch.concurrency(1);
this.fns.forEach(function(fn) {
batch.push(fn.bind(context));
}, this);
return batch;
};
{
"dependencies": {
"backo": "^1.0.1",
"batch": "^0.5.1"
}
}
$ node test.js
state: paused -> dequeueing
dequeueing...
queue is empty. retrying...
dequeueing...
queue is empty. retrying...
dequeueing...
state: dequeueing -> paused
state: paused -> processing
process { id: 8019976 }
step 1: id = 8019976
step 2: id = 8019976
step 3: id = 8019976
step 4: id = 8019976
done { id: 8019976,
foo: 24.12279276177287,
bar: 931.7935067228973,
baz: 328.60247092321515 }
state: processing -> paused
state: paused -> dequeueing
dequeueing...
queue is empty. retrying...
dequeueing...
queue is empty. retrying...
dequeueing...
state: dequeueing -> paused
state: paused -> processing
process { id: 10388833 }
step 1: id = 10388833
step 2: id = 10388833
step 3: id = 10388833
step 4: id = 10388833
done { id: 10388833,
foo: 896.6641423758119,
bar: 782.1176988072693,
baz: 838.39006209746 }
state: processing -> paused
state: paused -> dequeueing
dequeueing...
queue is empty. retrying...
dequeueing...
state: dequeueing -> paused
state: paused -> processing
process { id: 14793871 }
step 1: id = 14793871
step 2: id = 14793871
step 3: id = 14793871
step 4: id = 14793871
done { id: 14793871,
foo: 366.2224696017802,
bar: 421.3729575276375,
baz: 1423.6292650457472 }
state: processing -> paused
state: paused -> dequeueing
dequeueing...
queue is empty. retrying...
dequeueing...
queue is empty. retrying...
dequeueing...
queue is empty. retrying...
dequeueing...
queue is empty. retrying...
dequeueing...
queue is empty. retrying...
dequeueing...
queue is empty. retrying...
dequeueing...
queue is empty. retrying...
dequeueing...
queue is empty. retrying...
dequeueing...
queue is empty. retrying...
dequeueing...
var Queue = require('./');
function dequeue(callback) {
// dequeue from redis.
console.log('dequeueing...');
setTimeout(function() {
if (Math.random() < 0.3) {
callback(null, { id: (Math.random() * 0xffffff) | 0 });
} else {
console.log('queue is empty. retrying...');
callback();
}
}, Math.random() * 500);
}
var queue = new Queue({ dequeue: dequeue });
queue.use(function(done) {
// prepare assets
console.log('step 1: id = %s', this.id);
this.foo = Math.random() * 1000;
setTimeout(done, this.foo);
});
queue.use(function(done) {
// generate frames
console.log('step 2: id = %s', this.id);
this.bar = Math.random() * 2000;
setTimeout(done, this.bar);
});
queue.use(function(done) {
// generate movie
console.log('step 3: id = %s', this.id);
this.baz = Math.random() * 3000;
setTimeout(done, this.baz);
});
queue.use(function(done) {
// post results
console.log('step 4: id = %s', this.id);
setTimeout(done, this.baz);
});
queue.on('process', function(context) {
console.log('process', context);
});
queue.on('done', function(context) {
console.log('done', context);
});
queue.on('state', function(curr, prev) {
console.log('state: %s -> %s', prev, curr);
});
queue.next(); // start queue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment