Skip to content

Instantly share code, notes, and snippets.

@kennethkalmer
Created July 10, 2009 12:13
Show Gist options
  • Save kennethkalmer/144471 to your computer and use it in GitHub Desktop.
Save kennethkalmer/144471 to your computer and use it in GitHub Desktop.
#
# AMQP generated daemon (this would be the file in libexec/)
#
# NOTE 1: This config would be adapted to be more ruote-friendly
#
# NOTE 2: This would be the names of queues used in the process definition, or JID's to use
#
# NOTE 3: This would be responsible for handling the processesing
#
# Run an event-loop for processing
DaemonKit::AMQP.run do
# Inside this block we're running inside the reactor setup by the
# amqp gem. Any code in the examples (from the gem) would work just
# fine here.
# Uncomment this for connection keep-alive
AMQP.conn.connection_status do |status|
DaemonKit.logger.debug("AMQP connection status changed: #{status}")
if status == :disconnected
AMQP.conn.reconnect(true)
end
end
# NOTE 1
config = DaemonKit::Config.load('agent')
amq = ::MQ.new
# Support for split personalities
# NOTE 2
identities = if config.identity.is_a?( Array )
config.identity
else
[ config.identity ]
end
identities.each do |q|
DaemonKit.logger.debug("Connecting to queue #{q}")
# Durable queue name, with our own identity
cmdq = amq.queue( q, :durable => true )
# Subscribe to the queue, with ack enabled. So if the daemon dies we
# can just get the message next time
cmdq.subscribe( :ack => true ) do |header,msg|
safely do
DaemonKit.logger.debug "Received workitem: #{msg.inspect}"
# NOTE 3
Workitem.process( :amqp, msg )
DaemonKit.logger.debug "Done processing workitem..."
header.ack
end
end
end
# Simple keep-alive ping
DaemonKit::Cron.scheduler.every("5m") do
identities.each do |i|
MQ.queue( 'remote-participant-status' ).publish( "#{i} OK" )
end
end
end
#
# Configuration
#
amqp = OpenWFE::Extras::AMQPParticipant.new
# Map participant names to queues
amqp.map('jacob', 'foo')
amqp.map('solomon', 'bar')
engine.register_participant( 'jacob', amqp )
engine.register_participant( 'solomon', amqp )
engine.register_participant( 'nelson', OpenWFE::Extras::XMPPParticipant )
#
# Sample process definition
#
class DaemonKitProcess < OpenWFE::ProcessDefinition
sequence do
# :activity is a nice to have
# :command is processed by remote daemon-kit, just like nanite: /class/method
# :queue is where to dispense the message to
jacob :activity => "Get random quote", :command => "/quote/random"
# :activity is a nice to have
# :command is processed by remote daemon-kit, just like nanite: /class/method
# :jid is where to dispense message to
nelson :activity => "Send random quote", :command => "/quote/send", :jid => '[email protected]'
end
end
#
# Workitem class to delegate instructions, this is very AMQP specific and would have to be adapted
# into something more XMPP/beanstalk friendly
#
# Responsible for processing the commands coming in over AMQP and
# delegating it to the correct class.
class Workitem
class << self
# = Process incoming commands via an AMQP queue
#
# Expects a JSON workitem from ruote that has these fields set in
# attributes key:
#
# {
# 'reply_queue' => 'queue to send replies to',
# 'params' => {
# 'command' => '/actor/method'
# }
# }
#
# == Notes on the command key:
#
# It looks like a resource, and will be treated as such. Is should
# be in the format of +/class/method+, and it will be passed the
# complete workitem as a hash.
#
# == Notes on replies
#
# Replies are sent back to the queue specified in the +reply_queue+ key.
#
# == Statusses
#
# Status messages are sent via topic exchanges, using the command
# as they routing key.
def process( work )
# keep it singleton
@instance ||= new
work = JSON.parse( work )
target, method = parse_command( work )
response = target.send( method, work )
# send fanout
send_reply( response )
end
def parse_command( work )
_, klass, method = work['attributes']['params']['command'].split('/')
@handlers ||= {}
unless @handlers.has_key?( klass )
@handlers[ klass ] = klass.classify.constantize.new
end
return @handlers[ klass ], method
end
def send_reply( response )
MQ.queue( response['attributes']['reply_queue'] ).publish( response.to_json )
response
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment