Skip to content

Instantly share code, notes, and snippets.

@technoweenie
Created February 20, 2012 04:55
Show Gist options
  • Save technoweenie/1867926 to your computer and use it in GitHub Desktop.
Save technoweenie/1867926 to your computer and use it in GitHub Desktop.
quick kestrel zeromq port attempt
node_modules
Events = require 'events'
class Queue extends Events.EventEmitter
# Simple queue that stores pending jobs, or available workers. If a job is
# added and there are workers, send it to one of the workers. If a worker
# becomes available, and there are pending jobs, send it a job immediately.
constructor: ->
@workers = []
@jobs = []
@quitting = false
# Public: Sends a job to a worker.
#
# worker - The String ID for the worker.
# job - The String job contents.
#
# Emits ('ready', worker, job).
# Returns nothing.
perform: (worker, job) ->
@emit 'ready', worker, job
@quit() if @quitting
# Public: Registers a workers as unavailable.
#
# worker - The String ID for the worker.
#
# Returns nothing.
removeWorker: (worker) ->
index = @workers.indexOf worker
if index > -1
delete @workers[index]
# Public: Registers a worker as available.
#
# worker - The String ID for the worker.
#
# Returns nothing.
addWorker: (worker) ->
if job = @jobs.shift()
@perform worker, job
else if @workers.indexOf(worker) == -1
@workers.push(worker)
# Public: Prepares to process a job.
#
# job - The String job contents.
#
# Returns nothing.
addJob: (job) ->
return if @quitting
if worker = @workers.shift()
@perform worker, job
else
@jobs.push job
quit: ->
@quitting = true
if @quitting and @jobs.length == 0
@emit 'empty'
exports.create = (args...) ->
new Queue args...
EMPTY = ''
# sets up the queue
queue = require('./workers').create()
# sends the job to the worker through the zeromq router
queue.on 'ready', (worker, job) ->
console.log queue.jobs.length, 'jobs / ', queue.workers.length, 'workers'
router.send [worker, EMPTY, job]
# sets up a timer to add random jobs
delay = parseFloat(process.env.RATE or 1) * 1000
timer = setInterval(->
queue.addJob randJob().toString()
console.log queue.jobs.length, 'jobs / ', queue.workers.length, 'workers'
, delay)
# generates random jobs.
randJob = ->
Math.round(Math.random() * 1000) / 1000
zmq = require 'zmq'
router = zmq.socket 'router'
# the router receives messages from workers:
router.on 'message', (id, _, msg) ->
msg = msg.toString()
switch msg
# the worker is available
when 'add' then queue.addWorker(id)
# just kidding, it's unavailable!
when 'remove'
queue.removeWorker(id)
router.send [id, EMPTY, 'ok']
else
console.log "wtf:"
console.log data
router.bindSync 'tcp://127.0.0.1:5555'
process.on 'SIGQUIT', ->
console.log 'closing...'
queue.on 'empty', ->
router.close()
process.exit()
clearInterval timer
queue.quit()
assert = require 'assert'
Events = require 'events'
Queue = require '..'
calls = 0
expectedWorker = null
expectedJob = null
events = new Events.EventEmitter
events.on 'ready', (worker, job) ->
calls += 1
assert.equal expectedWorker, worker
assert.equal expectedJob, job
queue = Queue.create events
assert.equal 0, queue.workers.length
assert.equal 0, queue.jobs.length
# add worker
events.emit 'message', '1'
assert.deepEqual ['1'], queue.workers
assert.equal 0, queue.jobs.length
# add worker
events.emit 'message', '2'
assert.deepEqual ['1', '2'], queue.workers
assert.equal 0, queue.jobs.length
# add job
expectedWorker = '1'
expectedJob = 'a'
queue.addJob expectedJob
assert.deepEqual ['2'], queue.workers
assert.equal 0, queue.jobs.length
# add worker
events.emit 'message', '3'
assert.deepEqual ['2', '3'], queue.workers
assert.equal 0, queue.jobs.length
# add job
expectedWorker = '2'
expectedJob = 'a'
queue.addJob expectedJob
assert.deepEqual ['3'], queue.workers
assert.equal 0, queue.jobs.length
# add job
expectedWorker = '3'
expectedJob = 'a'
queue.addJob expectedJob
assert.deepEqual [], queue.workers
assert.equal 0, queue.jobs.length
# add job
queue.addJob 'a'
assert.deepEqual [], queue.workers
assert.deepEqual ['a'], queue.jobs
# add job
queue.addJob 'b'
assert.deepEqual [], queue.workers
assert.deepEqual ['a', 'b'], queue.jobs
# add worker
expectedWorker = '1'
expectedJob = 'a'
events.emit 'message', '1'
assert.deepEqual [], queue.workers
assert.deepEqual ['b'], queue.jobs
# add worker
expectedWorker = '2'
expectedJob = 'b'
events.emit 'message', '2'
assert.deepEqual [], queue.workers
assert.deepEqual [], queue.jobs
process.on 'exit', ->
assert.equal 5, calls
class Worker
attr_reader :identity
attr_accessor :working
# Really basic ZeroMQ worker that gets jobs through a REQ socket. It first
# sends a message to signal its availability. The received message is the
# job. This is the "Simple Pirate Pattern" from the ZeroMQ Guide:
#
# http://zguide.zeromq.org/page:all#Basic-Reliable-Queuing-Simple-Pirate-Pattern
#
# context - A ZeroMQ Context. There should be 1 per app.
# id - Optional socket identity. Default: Process.pid
#
def initialize(context, id = nil)
@context = context
@identity = (id || Process.pid).to_s
@working = true
@socket = nil
@poller = nil
@poller_item = nil
@on_connect = nil
@on_exit = nil
@on_job = nil
end
# Public: Sets a block to be called when creating the socket.
#
# worker.on_connect do |context|
# context.connect :REQ, 'tcp://127.0.0.1:5555'
# end
#
# Yields a ZMQ::Context.
# Returns nothing.
def on_connect(&block)
@on_connect = block
end
# Public: Sets a block to be called when the worker is shutting down. This
# gives the worker a chance to tell the server it is unavailable.
#
# worker.on_exit do |socket|
# socket.send 'bye'
# socket.recv # wait for the server to acknowledge
# end
#
# Yields a ZMQ::Socket.
# Returns nothing.
def on_exit(&block)
@on_exit = block
end
# Public: Sets a block to be called when a job has been given to the worker.
#
# worker.on_job do |job|
# do_some_work(job)
# end
#
# Yields a String job.
# Returns nothing.
def on_job(&block)
@on_job = block
end
# Public: Starts the work loop.
#
# start_msg - Optional String message to send to the socket to indicate
# availability of this worker.
#
# Returns nothing.
def perform(start_msg = nil)
reconnect start_msg
loop do
if socket = poll
res = @on_job.call(socket.recv_nonblock)
socket.send(res) if @working
end
if !@working && close_socket(@on_exit)
puts "#{@identity}> CLOSING..."
return
end
if @working && !socket
reconnect start_msg
end
end
end
# Public: Gracefully prepares to shut this worker down. This stops the
# #perform loop after the current job finishes.
#
# Returns nothing.
def quit
puts "#{@identity}> QUITTING..."
@working = false
end
# Internal: Polls the socket and returns a readable socket. If nothing comes
# back, try reconnecting.
#
# Returns a ZMQ::Socket, or nil.
def poll
@poller.poll 5000
@poller.readables.first
rescue Errno::EINTR
end
# Internal: Closes the socket if created.
#
# on_exit - Optional Block to call before shutting down. If set, but the
# is not readable, wait for the next loop in #perform before trying
# to close the socket.
#
# Returns true if the socket is closed, or false.
def close_socket(on_exit = nil)
return true if !@socket
if on_exit
if @socket.events != ZMQ::POLLOUT
return false # socket is readable, wait for the job
end
@on_exit.call @socket
end
@poller.remove(@poller_item) if @poller_item
@socket.close if @socket
true
end
# Internal: Reconnects the socket to the server. Sets up a ZMQ::Poller to
# do the work.
#
# msg - Optional String message to send upon connecting.
#
# Returns nothing.
def reconnect(msg = nil)
puts "#{@identity}> CONNECTING..."
@poller ||= ZMQ::Poller.new
close_socket
@socket = @on_connect.call(@context)
@socket.identity = @identity
@poller_item = ZMQ::Pollitem(@socket, ZMQ::POLLIN)
@poller.register @poller_item
@socket.send msg if msg
end
end
# Example usage of the worker
require 'rbczmq'
# setup a context
ctx = ZMQ::Context.new
# these are hardcoded commands for queue.coffee
add_cmd = 'add'
remove_cmd = 'remove'
# create 5 workers in separate threads
workers = []
threads = []
5.times do |i|
id = "#{Process.pid}-#{i}"
worker = Worker.new ctx, id
# connect to the server
worker.on_connect do |ctx|
ctx.connect :REQ, 'tcp://127.0.0.1:5555'
end
# tell the server the worker is unavailable
worker.on_exit do |socket|
socket.send remove_cmd
print "Removing... "
puts socket.recv
end
# each job is just a sleep
worker.on_job do |job|
print "sleeping #{job}s..."
sleep job.to_f
puts 'done'
add_cmd
end
workers << worker
threads << Thread.new { worker.perform(add_cmd) }
end
# `kill -QUIT {pid}` to gracefully shut the workers down
trap :QUIT do
workers.each &:quit
end
threads.each &:join
# if the threads return, the workers are done. destroy the context and go
# home.
ctx.destroy
@technoweenie
Copy link
Author

technoweenie commented Feb 21, 2012 via email

@jnunemaker
Copy link

Very cool. I definitely would like to use/try zeromq along side kestrel someday. The benefit is we won't need dedicated queue server boxes, just sockets and workers.

Thus far, the pusher proxy "just works" which gives me confidence in zeromq, but I need to see where if fails and how before putting tracking data there.

Totally loved reading through this. I like the idea of recreating stuff in a different technology so you can learn. You should definitely submit this to rubyconf brazil. :)

Really sweet that you can just add/remove workers to the router using messages. I've been thinking it could be cool/heretical to give all our app instances the ability to talk to each other. I don't have a great use case for it, but it just seems like all processes should be able to talk to each other really easily, be they an app instance, a worker, or whatever.

@technoweenie
Copy link
Author

technoweenie commented Feb 21, 2012 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment