Skip to content

Instantly share code, notes, and snippets.

@datapimp
Created February 13, 2011 06:24
Show Gist options
  • Select an option

  • Save datapimp/824493 to your computer and use it in GitHub Desktop.

Select an option

Save datapimp/824493 to your computer and use it in GitHub Desktop.
(function() {
var Observable, Queue, Worker, util, _;
var __hasProp = Object.prototype.hasOwnProperty, __extends = function(child, parent) {
for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; }
function ctor() { this.constructor = child; }
ctor.prototype = parent.prototype;
child.prototype = new ctor;
child.__super__ = parent.prototype;
return child;
};
_ = global._ || require("underscore")._;
util = require("util");
Observable = new require("events").EventEmitter;
Worker = require("./worker").Worker;
module.exports = Queue = (function() {
__extends(Queue, Observable);
Queue.prototype.available = function() {
return _.select(this.workers, function(worker) {
return worker.state !== "busy";
});
};
Queue.prototype.shift = function() {
return this.items.shift();
};
Queue.prototype.push = function(message) {
return this.items.push(message);
};
Queue.prototype.self = function() {
return this;
};
Queue.prototype.new_worker = function() {
var worker;
worker = new this.worker_class({
queue: this.self
});
return this.workers.push(worker);
};
function Queue(name, options) {
var num, _base, _ref;
this.name = name;
this.options = options;
this.options || (this.options = {});
this.workers = [];
this.worker_class || (this.worker_class = this.options.worker_class || Worker);
this.items = [];
this.options || (this.options = {});
(_base = this.options).number_of_workers || (_base.number_of_workers = 2);
for (num = 1, _ref = this.options.number_of_workers; (1 <= _ref ? num <= _ref : num >= _ref); (1 <= _ref ? num += 1 : num -= 1)) {
this.new_worker();
}
this.redis = global.redis || require("redis").createClient();
this.redis.subscribe(this.name, function(__, data) {
this.push(data);
return this.emit("push");
});
this.on("push", function() {
return this.available().first().perform(this.shift());
});
}
return Queue;
})();
module.exports.Queue = Queue;
}).call(this);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment