Skip to content

Instantly share code, notes, and snippets.

@technoweenie
Created February 20, 2012 04:55
Show Gist options
  • Save technoweenie/1867926 to your computer and use it in GitHub Desktop.
Save technoweenie/1867926 to your computer and use it in GitHub Desktop.
quick kestrel zeromq port attempt
node_modules
Events = require 'events'
class Queue extends Events.EventEmitter
# Simple queue that stores pending jobs, or available workers. If a job is
# added and there are workers, send it to one of the workers. If a worker
# becomes available, and there are pending jobs, send it a job immediately.
constructor: ->
@workers = []
@jobs = []
@quitting = false
# Public: Sends a job to a worker.
#
# worker - The String ID for the worker.
# job - The String job contents.
#
# Emits ('ready', worker, job).
# Returns nothing.
perform: (worker, job) ->
@emit 'ready', worker, job
@quit() if @quitting
# Public: Registers a workers as unavailable.
#
# worker - The String ID for the worker.
#
# Returns nothing.
removeWorker: (worker) ->
index = @workers.indexOf worker
if index > -1
delete @workers[index]
# Public: Registers a worker as available.
#
# worker - The String ID for the worker.
#
# Returns nothing.
addWorker: (worker) ->
if job = @jobs.shift()
@perform worker, job
else if @workers.indexOf(worker) == -1
@workers.push(worker)
# Public: Prepares to process a job.
#
# job - The String job contents.
#
# Returns nothing.
addJob: (job) ->
return if @quitting
if worker = @workers.shift()
@perform worker, job
else
@jobs.push job
quit: ->
@quitting = true
if @quitting and @jobs.length == 0
@emit 'empty'
exports.create = (args...) ->
new Queue args...
EMPTY = ''
# sets up the queue
queue = require('./workers').create()
# sends the job to the worker through the zeromq router
queue.on 'ready', (worker, job) ->
console.log queue.jobs.length, 'jobs / ', queue.workers.length, 'workers'
router.send [worker, EMPTY, job]
# sets up a timer to add random jobs
delay = parseFloat(process.env.RATE or 1) * 1000
timer = setInterval(->
queue.addJob randJob().toString()
console.log queue.jobs.length, 'jobs / ', queue.workers.length, 'workers'
, delay)
# generates random jobs.
randJob = ->
Math.round(Math.random() * 1000) / 1000
zmq = require 'zmq'
router = zmq.socket 'router'
# the router receives messages from workers:
router.on 'message', (id, _, msg) ->
msg = msg.toString()
switch msg
# the worker is available
when 'add' then queue.addWorker(id)
# just kidding, it's unavailable!
when 'remove'
queue.removeWorker(id)
router.send [id, EMPTY, 'ok']
else
console.log "wtf:"
console.log data
router.bindSync 'tcp://127.0.0.1:5555'
process.on 'SIGQUIT', ->
console.log 'closing...'
queue.on 'empty', ->
router.close()
process.exit()
clearInterval timer
queue.quit()
assert = require 'assert'
Events = require 'events'
Queue = require '..'
calls = 0
expectedWorker = null
expectedJob = null
events = new Events.EventEmitter
events.on 'ready', (worker, job) ->
calls += 1
assert.equal expectedWorker, worker
assert.equal expectedJob, job
queue = Queue.create events
assert.equal 0, queue.workers.length
assert.equal 0, queue.jobs.length
# add worker
events.emit 'message', '1'
assert.deepEqual ['1'], queue.workers
assert.equal 0, queue.jobs.length
# add worker
events.emit 'message', '2'
assert.deepEqual ['1', '2'], queue.workers
assert.equal 0, queue.jobs.length
# add job
expectedWorker = '1'
expectedJob = 'a'
queue.addJob expectedJob
assert.deepEqual ['2'], queue.workers
assert.equal 0, queue.jobs.length
# add worker
events.emit 'message', '3'
assert.deepEqual ['2', '3'], queue.workers
assert.equal 0, queue.jobs.length
# add job
expectedWorker = '2'
expectedJob = 'a'
queue.addJob expectedJob
assert.deepEqual ['3'], queue.workers
assert.equal 0, queue.jobs.length
# add job
expectedWorker = '3'
expectedJob = 'a'
queue.addJob expectedJob
assert.deepEqual [], queue.workers
assert.equal 0, queue.jobs.length
# add job
queue.addJob 'a'
assert.deepEqual [], queue.workers
assert.deepEqual ['a'], queue.jobs
# add job
queue.addJob 'b'
assert.deepEqual [], queue.workers
assert.deepEqual ['a', 'b'], queue.jobs
# add worker
expectedWorker = '1'
expectedJob = 'a'
events.emit 'message', '1'
assert.deepEqual [], queue.workers
assert.deepEqual ['b'], queue.jobs
# add worker
expectedWorker = '2'
expectedJob = 'b'
events.emit 'message', '2'
assert.deepEqual [], queue.workers
assert.deepEqual [], queue.jobs
process.on 'exit', ->
assert.equal 5, calls
class Worker
attr_reader :identity
attr_accessor :working
# Really basic ZeroMQ worker that gets jobs through a REQ socket. It first
# sends a message to signal its availability. The received message is the
# job. This is the "Simple Pirate Pattern" from the ZeroMQ Guide:
#
# http://zguide.zeromq.org/page:all#Basic-Reliable-Queuing-Simple-Pirate-Pattern
#
# context - A ZeroMQ Context. There should be 1 per app.
# id - Optional socket identity. Default: Process.pid
#
def initialize(context, id = nil)
@context = context
@identity = (id || Process.pid).to_s
@working = true
@socket = nil
@poller = nil
@poller_item = nil
@on_connect = nil
@on_exit = nil
@on_job = nil
end
# Public: Sets a block to be called when creating the socket.
#
# worker.on_connect do |context|
# context.connect :REQ, 'tcp://127.0.0.1:5555'
# end
#
# Yields a ZMQ::Context.
# Returns nothing.
def on_connect(&block)
@on_connect = block
end
# Public: Sets a block to be called when the worker is shutting down. This
# gives the worker a chance to tell the server it is unavailable.
#
# worker.on_exit do |socket|
# socket.send 'bye'
# socket.recv # wait for the server to acknowledge
# end
#
# Yields a ZMQ::Socket.
# Returns nothing.
def on_exit(&block)
@on_exit = block
end
# Public: Sets a block to be called when a job has been given to the worker.
#
# worker.on_job do |job|
# do_some_work(job)
# end
#
# Yields a String job.
# Returns nothing.
def on_job(&block)
@on_job = block
end
# Public: Starts the work loop.
#
# start_msg - Optional String message to send to the socket to indicate
# availability of this worker.
#
# Returns nothing.
def perform(start_msg = nil)
reconnect start_msg
loop do
if socket = poll
res = @on_job.call(socket.recv_nonblock)
socket.send(res) if @working
end
if !@working && close_socket(@on_exit)
puts "#{@identity}> CLOSING..."
return
end
if @working && !socket
reconnect start_msg
end
end
end
# Public: Gracefully prepares to shut this worker down. This stops the
# #perform loop after the current job finishes.
#
# Returns nothing.
def quit
puts "#{@identity}> QUITTING..."
@working = false
end
# Internal: Polls the socket and returns a readable socket. If nothing comes
# back, try reconnecting.
#
# Returns a ZMQ::Socket, or nil.
def poll
@poller.poll 5000
@poller.readables.first
rescue Errno::EINTR
end
# Internal: Closes the socket if created.
#
# on_exit - Optional Block to call before shutting down. If set, but the
# is not readable, wait for the next loop in #perform before trying
# to close the socket.
#
# Returns true if the socket is closed, or false.
def close_socket(on_exit = nil)
return true if !@socket
if on_exit
if @socket.events != ZMQ::POLLOUT
return false # socket is readable, wait for the job
end
@on_exit.call @socket
end
@poller.remove(@poller_item) if @poller_item
@socket.close if @socket
true
end
# Internal: Reconnects the socket to the server. Sets up a ZMQ::Poller to
# do the work.
#
# msg - Optional String message to send upon connecting.
#
# Returns nothing.
def reconnect(msg = nil)
puts "#{@identity}> CONNECTING..."
@poller ||= ZMQ::Poller.new
close_socket
@socket = @on_connect.call(@context)
@socket.identity = @identity
@poller_item = ZMQ::Pollitem(@socket, ZMQ::POLLIN)
@poller.register @poller_item
@socket.send msg if msg
end
end
# Example usage of the worker
require 'rbczmq'
# setup a context
ctx = ZMQ::Context.new
# these are hardcoded commands for queue.coffee
add_cmd = 'add'
remove_cmd = 'remove'
# create 5 workers in separate threads
workers = []
threads = []
5.times do |i|
id = "#{Process.pid}-#{i}"
worker = Worker.new ctx, id
# connect to the server
worker.on_connect do |ctx|
ctx.connect :REQ, 'tcp://127.0.0.1:5555'
end
# tell the server the worker is unavailable
worker.on_exit do |socket|
socket.send remove_cmd
print "Removing... "
puts socket.recv
end
# each job is just a sleep
worker.on_job do |job|
print "sleeping #{job}s..."
sleep job.to_f
puts 'done'
add_cmd
end
workers << worker
threads << Thread.new { worker.perform(add_cmd) }
end
# `kill -QUIT {pid}` to gracefully shut the workers down
trap :QUIT do
workers.each &:quit
end
threads.each &:join
# if the threads return, the workers are done. destroy the context and go
# home.
ctx.destroy
@technoweenie
Copy link
Author

technoweenie commented Feb 21, 2012 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment