-
-
Save technoweenie/1867926 to your computer and use it in GitHub Desktop.
node_modules |
Events = require 'events' | |
class Queue extends Events.EventEmitter | |
# Simple queue that stores pending jobs, or available workers. If a job is | |
# added and there are workers, send it to one of the workers. If a worker | |
# becomes available, and there are pending jobs, send it a job immediately. | |
constructor: -> | |
@workers = [] | |
@jobs = [] | |
@quitting = false | |
# Public: Sends a job to a worker. | |
# | |
# worker - The String ID for the worker. | |
# job - The String job contents. | |
# | |
# Emits ('ready', worker, job). | |
# Returns nothing. | |
perform: (worker, job) -> | |
@emit 'ready', worker, job | |
@quit() if @quitting | |
# Public: Registers a workers as unavailable. | |
# | |
# worker - The String ID for the worker. | |
# | |
# Returns nothing. | |
removeWorker: (worker) -> | |
index = @workers.indexOf worker | |
if index > -1 | |
delete @workers[index] | |
# Public: Registers a worker as available. | |
# | |
# worker - The String ID for the worker. | |
# | |
# Returns nothing. | |
addWorker: (worker) -> | |
if job = @jobs.shift() | |
@perform worker, job | |
else if @workers.indexOf(worker) == -1 | |
@workers.push(worker) | |
# Public: Prepares to process a job. | |
# | |
# job - The String job contents. | |
# | |
# Returns nothing. | |
addJob: (job) -> | |
return if @quitting | |
if worker = @workers.shift() | |
@perform worker, job | |
else | |
@jobs.push job | |
quit: -> | |
@quitting = true | |
if @quitting and @jobs.length == 0 | |
@emit 'empty' | |
exports.create = (args...) -> | |
new Queue args... | |
EMPTY = '' | |
# sets up the queue | |
queue = require('./workers').create() | |
# sends the job to the worker through the zeromq router | |
queue.on 'ready', (worker, job) -> | |
console.log queue.jobs.length, 'jobs / ', queue.workers.length, 'workers' | |
router.send [worker, EMPTY, job] | |
# sets up a timer to add random jobs | |
delay = parseFloat(process.env.RATE or 1) * 1000 | |
timer = setInterval(-> | |
queue.addJob randJob().toString() | |
console.log queue.jobs.length, 'jobs / ', queue.workers.length, 'workers' | |
, delay) | |
# generates random jobs. | |
randJob = -> | |
Math.round(Math.random() * 1000) / 1000 | |
zmq = require 'zmq' | |
router = zmq.socket 'router' | |
# the router receives messages from workers: | |
router.on 'message', (id, _, msg) -> | |
msg = msg.toString() | |
switch msg | |
# the worker is available | |
when 'add' then queue.addWorker(id) | |
# just kidding, it's unavailable! | |
when 'remove' | |
queue.removeWorker(id) | |
router.send [id, EMPTY, 'ok'] | |
else | |
console.log "wtf:" | |
console.log data | |
router.bindSync 'tcp://127.0.0.1:5555' | |
process.on 'SIGQUIT', -> | |
console.log 'closing...' | |
queue.on 'empty', -> | |
router.close() | |
process.exit() | |
clearInterval timer | |
queue.quit() | |
assert = require 'assert' | |
Events = require 'events' | |
Queue = require '..' | |
calls = 0 | |
expectedWorker = null | |
expectedJob = null | |
events = new Events.EventEmitter | |
events.on 'ready', (worker, job) -> | |
calls += 1 | |
assert.equal expectedWorker, worker | |
assert.equal expectedJob, job | |
queue = Queue.create events | |
assert.equal 0, queue.workers.length | |
assert.equal 0, queue.jobs.length | |
# add worker | |
events.emit 'message', '1' | |
assert.deepEqual ['1'], queue.workers | |
assert.equal 0, queue.jobs.length | |
# add worker | |
events.emit 'message', '2' | |
assert.deepEqual ['1', '2'], queue.workers | |
assert.equal 0, queue.jobs.length | |
# add job | |
expectedWorker = '1' | |
expectedJob = 'a' | |
queue.addJob expectedJob | |
assert.deepEqual ['2'], queue.workers | |
assert.equal 0, queue.jobs.length | |
# add worker | |
events.emit 'message', '3' | |
assert.deepEqual ['2', '3'], queue.workers | |
assert.equal 0, queue.jobs.length | |
# add job | |
expectedWorker = '2' | |
expectedJob = 'a' | |
queue.addJob expectedJob | |
assert.deepEqual ['3'], queue.workers | |
assert.equal 0, queue.jobs.length | |
# add job | |
expectedWorker = '3' | |
expectedJob = 'a' | |
queue.addJob expectedJob | |
assert.deepEqual [], queue.workers | |
assert.equal 0, queue.jobs.length | |
# add job | |
queue.addJob 'a' | |
assert.deepEqual [], queue.workers | |
assert.deepEqual ['a'], queue.jobs | |
# add job | |
queue.addJob 'b' | |
assert.deepEqual [], queue.workers | |
assert.deepEqual ['a', 'b'], queue.jobs | |
# add worker | |
expectedWorker = '1' | |
expectedJob = 'a' | |
events.emit 'message', '1' | |
assert.deepEqual [], queue.workers | |
assert.deepEqual ['b'], queue.jobs | |
# add worker | |
expectedWorker = '2' | |
expectedJob = 'b' | |
events.emit 'message', '2' | |
assert.deepEqual [], queue.workers | |
assert.deepEqual [], queue.jobs | |
process.on 'exit', -> | |
assert.equal 5, calls |
class Worker | |
attr_reader :identity | |
attr_accessor :working | |
# Really basic ZeroMQ worker that gets jobs through a REQ socket. It first | |
# sends a message to signal its availability. The received message is the | |
# job. This is the "Simple Pirate Pattern" from the ZeroMQ Guide: | |
# | |
# http://zguide.zeromq.org/page:all#Basic-Reliable-Queuing-Simple-Pirate-Pattern | |
# | |
# context - A ZeroMQ Context. There should be 1 per app. | |
# id - Optional socket identity. Default: Process.pid | |
# | |
def initialize(context, id = nil) | |
@context = context | |
@identity = (id || Process.pid).to_s | |
@working = true | |
@socket = nil | |
@poller = nil | |
@poller_item = nil | |
@on_connect = nil | |
@on_exit = nil | |
@on_job = nil | |
end | |
# Public: Sets a block to be called when creating the socket. | |
# | |
# worker.on_connect do |context| | |
# context.connect :REQ, 'tcp://127.0.0.1:5555' | |
# end | |
# | |
# Yields a ZMQ::Context. | |
# Returns nothing. | |
def on_connect(&block) | |
@on_connect = block | |
end | |
# Public: Sets a block to be called when the worker is shutting down. This | |
# gives the worker a chance to tell the server it is unavailable. | |
# | |
# worker.on_exit do |socket| | |
# socket.send 'bye' | |
# socket.recv # wait for the server to acknowledge | |
# end | |
# | |
# Yields a ZMQ::Socket. | |
# Returns nothing. | |
def on_exit(&block) | |
@on_exit = block | |
end | |
# Public: Sets a block to be called when a job has been given to the worker. | |
# | |
# worker.on_job do |job| | |
# do_some_work(job) | |
# end | |
# | |
# Yields a String job. | |
# Returns nothing. | |
def on_job(&block) | |
@on_job = block | |
end | |
# Public: Starts the work loop. | |
# | |
# start_msg - Optional String message to send to the socket to indicate | |
# availability of this worker. | |
# | |
# Returns nothing. | |
def perform(start_msg = nil) | |
reconnect start_msg | |
loop do | |
if socket = poll | |
res = @on_job.call(socket.recv_nonblock) | |
socket.send(res) if @working | |
end | |
if !@working && close_socket(@on_exit) | |
puts "#{@identity}> CLOSING..." | |
return | |
end | |
if @working && !socket | |
reconnect start_msg | |
end | |
end | |
end | |
# Public: Gracefully prepares to shut this worker down. This stops the | |
# #perform loop after the current job finishes. | |
# | |
# Returns nothing. | |
def quit | |
puts "#{@identity}> QUITTING..." | |
@working = false | |
end | |
# Internal: Polls the socket and returns a readable socket. If nothing comes | |
# back, try reconnecting. | |
# | |
# Returns a ZMQ::Socket, or nil. | |
def poll | |
@poller.poll 5000 | |
@poller.readables.first | |
rescue Errno::EINTR | |
end | |
# Internal: Closes the socket if created. | |
# | |
# on_exit - Optional Block to call before shutting down. If set, but the | |
# is not readable, wait for the next loop in #perform before trying | |
# to close the socket. | |
# | |
# Returns true if the socket is closed, or false. | |
def close_socket(on_exit = nil) | |
return true if !@socket | |
if on_exit | |
if @socket.events != ZMQ::POLLOUT | |
return false # socket is readable, wait for the job | |
end | |
@on_exit.call @socket | |
end | |
@poller.remove(@poller_item) if @poller_item | |
@socket.close if @socket | |
true | |
end | |
# Internal: Reconnects the socket to the server. Sets up a ZMQ::Poller to | |
# do the work. | |
# | |
# msg - Optional String message to send upon connecting. | |
# | |
# Returns nothing. | |
def reconnect(msg = nil) | |
puts "#{@identity}> CONNECTING..." | |
@poller ||= ZMQ::Poller.new | |
close_socket | |
@socket = @on_connect.call(@context) | |
@socket.identity = @identity | |
@poller_item = ZMQ::Pollitem(@socket, ZMQ::POLLIN) | |
@poller.register @poller_item | |
@socket.send msg if msg | |
end | |
end | |
# Example usage of the worker | |
require 'rbczmq' | |
# setup a context | |
ctx = ZMQ::Context.new | |
# these are hardcoded commands for queue.coffee | |
add_cmd = 'add' | |
remove_cmd = 'remove' | |
# create 5 workers in separate threads | |
workers = [] | |
threads = [] | |
5.times do |i| | |
id = "#{Process.pid}-#{i}" | |
worker = Worker.new ctx, id | |
# connect to the server | |
worker.on_connect do |ctx| | |
ctx.connect :REQ, 'tcp://127.0.0.1:5555' | |
end | |
# tell the server the worker is unavailable | |
worker.on_exit do |socket| | |
socket.send remove_cmd | |
print "Removing... " | |
puts socket.recv | |
end | |
# each job is just a sleep | |
worker.on_job do |job| | |
print "sleeping #{job}s..." | |
sleep job.to_f | |
puts 'done' | |
add_cmd | |
end | |
workers << worker | |
threads << Thread.new { worker.perform(add_cmd) } | |
end | |
# `kill -QUIT {pid}` to gracefully shut the workers down | |
trap :QUIT do | |
workers.each &:quit | |
end | |
threads.each &:join | |
# if the threads return, the workers are done. destroy the context and go | |
# home. | |
ctx.destroy | |
technoweenie
commented
Feb 21, 2012
via email
Very cool. I definitely would like to use/try zeromq along side kestrel someday. The benefit is we won't need dedicated queue server boxes, just sockets and workers.
Thus far, the pusher proxy "just works" which gives me confidence in zeromq, but I need to see where if fails and how before putting tracking data there.
Totally loved reading through this. I like the idea of recreating stuff in a different technology so you can learn. You should definitely submit this to rubyconf brazil. :)
Really sweet that you can just add/remove workers to the router using messages. I've been thinking it could be cool/heretical to give all our app instances the ability to talk to each other. I don't have a great use case for it, but it just seems like all processes should be able to talk to each other really easily, be they an app instance, a worker, or whatever.