Skip to content

Instantly share code, notes, and snippets.

@datapimp
Created February 13, 2011 07:52
Show Gist options
  • Select an option

  • Save datapimp/824530 to your computer and use it in GitHub Desktop.

Select an option

Save datapimp/824530 to your computer and use it in GitHub Desktop.
(function() {
var Observable, Queue, Worker, eyes, util, _;
var __hasProp = Object.prototype.hasOwnProperty, __extends = function(child, parent) {
for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; }
function ctor() { this.constructor = child; }
ctor.prototype = parent.prototype;
child.prototype = new ctor;
child.__super__ = parent.prototype;
return child;
};
_ = global._ || require("underscore")._;
util = require("util");
eyes = require("eyes");
Observable = new require("events").EventEmitter;
Worker = require("./worker").Worker;
module.exports = Queue = (function() {
__extends(Queue, Observable);
Queue.prototype.available = function() {
return _.select(this.workers, function(worker) {
return worker.state !== "busy";
});
};
Queue.prototype.shift = function() {
return this.items.shift();
};
Queue.prototype.push = function(message) {
return this.items.push(message);
};
Queue.prototype.self = function() {
return this;
};
Queue.prototype.new_worker = function() {
var worker;
worker = new this.worker_class({
queue: this.self
});
return this.workers.push(worker);
};
Queue.prototype.spawn_workers = function() {
var num, _ref, _results;
_results = [];
for (num = 1, _ref = this.options.number_of_workers; (1 <= _ref ? num <= _ref : num >= _ref); (1 <= _ref ? num += 1 : num -= 1)) {
_results.push(this.new_worker({
queue: this.self
}));
}
return _results;
};
function Queue(name, options) {
var _base;
this.name = name;
this.options = options;
this.options || (this.options = {});
this.workers = [];
this.worker_class || (this.worker_class = this.options.worker_class || Worker);
this.items = [];
this.options || (this.options = {});
(_base = this.options).number_of_workers || (_base.number_of_workers = 2);
this.spawn_workers();
this.redis = global.redis || require("redis").createClient();
this.redis.subscribe(this.name, function(__, data) {
this.push(data);
return this.emit("push");
});
this.on("push", function() {
return this.available().first().perform(this.shift());
});
}
return Queue;
})();
module.exports.Queue = Queue;
}).call(this);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment