Skip to content

Instantly share code, notes, and snippets.

@datapimp
Created February 13, 2011 06:23
Show Gist options
  • Select an option

  • Save datapimp/824492 to your computer and use it in GitHub Desktop.

Select an option

Save datapimp/824492 to your computer and use it in GitHub Desktop.
# author: jonathan soeder
# copyright: jonathan soeder 2011
#
# the Queue class subscribes to a redis channel waiting for jobs to process
# the Queue creates X number of workers of a specified class
# which shift messages off of the queue and process it
# publishing the results to another channel
_ = global._ or require("underscore")._
util = require "util"
Observable = new require("events").EventEmitter
Worker = require("./worker").Worker
module.exports = class Queue extends Observable
# lists available workers
available: ->
_.select @workers, (worker)->
worker.state isnt "busy"
# messages get processed in a first in first out
shift: ->
@items.shift()
# add a message
push: (message)->
@items.push(message)
self: ()->
this
# create a new worker
# for whatever class
# this queue needs
new_worker: ()->
worker = new @worker_class
queue: @self
@workers.push worker
# create a new queue which listens to a channel
# for messages, which get handled by the workers
# which this queue creates
constructor: (@name, @options)->
@options ||= {}
# workers are constantly shifting messages
# off of the queue and doing work against them
@workers = []
# all workers should inherit from the Worker class
# but be configured to perform specific acts
# and add their output to a redis structure
@worker_class ||= @options.worker_class || Worker
# the items that are being processed
@items = []
# default options
@options ||= {}
# depending on the volume of activity
# we may want to spawn multiple workers
@options.number_of_workers ||= 2
@new_worker() for num in [[email protected]_of_workers]
@redis = global.redis or require("redis").createClient()
# the queue subscribers to a redis channel
# where it acts upon jobs created fr it
@redis.subscribe @name, (__,data)->
@push data
@emit "push"
# when an item is pushed onto the queue
# then we need to make sure workers are
# out there hustling
@on "push", ()-> @available().first().perform @shift()
module.exports.Queue = Queue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment