Last active
August 29, 2015 14:07
-
-
Save nulltask/8d4b1f9b8a3867d008fa to your computer and use it in GitHub Desktop.
simple queue library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Module dependencies. | |
| */ | |
| var assert = require('assert'); | |
| var Emitter = require('events').EventEmitter; | |
| var Batch = require('batch'); | |
| var Backoff = require('backo'); | |
| /** | |
| * Expose `Queue`. | |
| */ | |
| module.exports = Queue; | |
| /** | |
| * @param {Object} opts | |
| */ | |
| function Queue(opts) { | |
| opts = opts || {}; | |
| this.fns = []; | |
| this.state('paused'); | |
| this.dequeue = opts.dequeue; | |
| this.backoff = new Backoff({ min: 10, max: 5000 }); | |
| } | |
| /** | |
| * Inherit from `EventEmitter`. | |
| */ | |
| Queue.prototype.__proto__ = Emitter.prototype; | |
| /** | |
| * @param {Function} fn | |
| * @return {Queue} | |
| */ | |
| Queue.prototype.use = function(fn) { | |
| this.fns.push(fn); | |
| return this; | |
| }; | |
| /** | |
| * @param {String} state | |
| * @param {Queue|String} | |
| */ | |
| Queue.prototype.state = function(state) { | |
| if (!arguments.length) return this.currentState; | |
| var prev = this.currentState; | |
| if (prev == state) return this; | |
| return this.emit('state', this.currentState = state, prev); | |
| }; | |
| /** | |
| * @return {Queue} | |
| */ | |
| Queue.prototype.next = | |
| Queue.prototype.resume = function() { | |
| var self = this; | |
| assert(this.dequeue, 'dequeue function required.'); | |
| this.pause(); | |
| this.state('dequeueing'); | |
| this.timer = setTimeout(function() { | |
| var callee = arguments.callee; | |
| self.dequeue(function(err, result) { | |
| if (err) console.error(err); | |
| if (result) return self.process(result); | |
| setTimeout(callee, self.backoff.duration()); | |
| }) | |
| }, this.backoff.duration()); | |
| return this; | |
| }; | |
| /** | |
| * @return {Queue} | |
| */ | |
| Queue.prototype.pause = function() { | |
| clearTimeout(this.timer); | |
| this.backoff.reset(); | |
| this.state('paused'); | |
| return this; | |
| }; | |
| /** | |
| * @param {Object} context | |
| * @return {Queue} | |
| */ | |
| Queue.prototype.process = function(context) { | |
| var self = this; | |
| this.pause(); | |
| this.state('processing'); | |
| this.emit('process', context); | |
| this.createBatch(context).end(function() { | |
| self.emit('done', context); | |
| self.resume(); | |
| }); | |
| return this; | |
| }; | |
| /** | |
| * @param {Object} context | |
| * @return {Batch} | |
| */ | |
| Queue.prototype.createBatch = function(context) { | |
| var batch = new Batch(); | |
| batch.concurrency(1); | |
| this.fns.forEach(function(fn) { | |
| batch.push(fn.bind(context)); | |
| }, this); | |
| return batch; | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "dependencies": { | |
| "backo": "^1.0.1", | |
| "batch": "^0.5.1" | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| $ node test.js | |
| state: paused -> dequeueing | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| state: dequeueing -> paused | |
| state: paused -> processing | |
| process { id: 8019976 } | |
| step 1: id = 8019976 | |
| step 2: id = 8019976 | |
| step 3: id = 8019976 | |
| step 4: id = 8019976 | |
| done { id: 8019976, | |
| foo: 24.12279276177287, | |
| bar: 931.7935067228973, | |
| baz: 328.60247092321515 } | |
| state: processing -> paused | |
| state: paused -> dequeueing | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| state: dequeueing -> paused | |
| state: paused -> processing | |
| process { id: 10388833 } | |
| step 1: id = 10388833 | |
| step 2: id = 10388833 | |
| step 3: id = 10388833 | |
| step 4: id = 10388833 | |
| done { id: 10388833, | |
| foo: 896.6641423758119, | |
| bar: 782.1176988072693, | |
| baz: 838.39006209746 } | |
| state: processing -> paused | |
| state: paused -> dequeueing | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| state: dequeueing -> paused | |
| state: paused -> processing | |
| process { id: 14793871 } | |
| step 1: id = 14793871 | |
| step 2: id = 14793871 | |
| step 3: id = 14793871 | |
| step 4: id = 14793871 | |
| done { id: 14793871, | |
| foo: 366.2224696017802, | |
| bar: 421.3729575276375, | |
| baz: 1423.6292650457472 } | |
| state: processing -> paused | |
| state: paused -> dequeueing | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... | |
| queue is empty. retrying... | |
| dequeueing... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| var Queue = require('./'); | |
| function dequeue(callback) { | |
| // dequeue from redis. | |
| console.log('dequeueing...'); | |
| setTimeout(function() { | |
| if (Math.random() < 0.3) { | |
| callback(null, { id: (Math.random() * 0xffffff) | 0 }); | |
| } else { | |
| console.log('queue is empty. retrying...'); | |
| callback(); | |
| } | |
| }, Math.random() * 500); | |
| } | |
| var queue = new Queue({ dequeue: dequeue }); | |
| queue.use(function(done) { | |
| // prepare assets | |
| console.log('step 1: id = %s', this.id); | |
| this.foo = Math.random() * 1000; | |
| setTimeout(done, this.foo); | |
| }); | |
| queue.use(function(done) { | |
| // generate frames | |
| console.log('step 2: id = %s', this.id); | |
| this.bar = Math.random() * 2000; | |
| setTimeout(done, this.bar); | |
| }); | |
| queue.use(function(done) { | |
| // generate movie | |
| console.log('step 3: id = %s', this.id); | |
| this.baz = Math.random() * 3000; | |
| setTimeout(done, this.baz); | |
| }); | |
| queue.use(function(done) { | |
| // post results | |
| console.log('step 4: id = %s', this.id); | |
| setTimeout(done, this.baz); | |
| }); | |
| queue.on('process', function(context) { | |
| console.log('process', context); | |
| }); | |
| queue.on('done', function(context) { | |
| console.log('done', context); | |
| }); | |
| queue.on('state', function(curr, prev) { | |
| console.log('state: %s -> %s', prev, curr); | |
| }); | |
| queue.next(); // start queue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment