Created
February 13, 2011 07:50
-
-
Save datapimp/824528 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # author: jonathan soeder | |
| # copyright: jonathan soeder 2011 | |
| # | |
| # the Queue class subscribes to a redis channel waiting for jobs to process | |
| # the Queue creates X number of workers of a specified class | |
| # which shift messages off of the queue and process it | |
| # publishing the results to another channel | |
| _ = global._ or require("underscore")._ | |
| util = require "util" | |
| eyes = require "eyes" | |
| Observable = new require("events").EventEmitter | |
| Worker = require("./worker").Worker | |
| module.exports = class Queue extends Observable | |
| # lists available workers | |
| available: -> | |
| _.select @workers, (worker)-> | |
| worker.state isnt "busy" | |
| # messages get processed in a first in first out | |
| shift: -> | |
| @items.shift() | |
| # add a message | |
| push: (message)-> | |
| @items.push(message) | |
| self: ()-> | |
| this | |
| # create a new worker | |
| # for whatever class | |
| # this queue needs | |
| new_worker: ()-> | |
| worker = new @worker_class | |
| queue: @self | |
| @workers.push worker | |
| spawn_workers: -> | |
| for num in [[email protected]_of_workers] | |
| @new_worker | |
| queue: @self | |
| # create a new queue which listens to a channel | |
| # for messages, which get handled by the workers | |
| # which this queue creates | |
| constructor: (@name, @options)-> | |
| @options ||= {} | |
| # workers are constantly shifting messages | |
| # off of the queue and doing work against them | |
| @workers = [] | |
| # all workers should inherit from the Worker class | |
| # but be configured to perform specific acts | |
| # and add their output to a redis structure | |
| @worker_class ||= @options.worker_class || Worker | |
| # the items that are being processed | |
| @items = [] | |
| # default options | |
| @options ||= {} | |
| # depending on the volume of activity | |
| # we may want to spawn multiple workers | |
| @options.number_of_workers ||= 2 | |
| @spawn_workers() | |
| @redis = global.redis or require("redis").createClient() | |
| # the queue subscribers to a redis channel | |
| # where it acts upon jobs created fr it | |
| @redis.subscribe @name, (__,data)-> | |
| @push data | |
| @emit "push" | |
| # when an item is pushed onto the queue | |
| # then we need to make sure workers are | |
| # out there hustling | |
| @on "push", ()-> @available().first().perform @shift() | |
| module.exports.Queue = Queue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment