Created
July 10, 2009 12:13
-
-
Save kennethkalmer/144471 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# AMQP generated daemon (this would be the file in libexec/) | |
# | |
# NOTE 1: This config would be adapted to be more ruote-friendly | |
# | |
# NOTE 2: This would be the names of queues used in the process definition, or JID's to use | |
# | |
# NOTE 3: This would be responsible for handling the processesing | |
# | |
# Run an event-loop for processing | |
DaemonKit::AMQP.run do | |
# Inside this block we're running inside the reactor setup by the | |
# amqp gem. Any code in the examples (from the gem) would work just | |
# fine here. | |
# Uncomment this for connection keep-alive | |
AMQP.conn.connection_status do |status| | |
DaemonKit.logger.debug("AMQP connection status changed: #{status}") | |
if status == :disconnected | |
AMQP.conn.reconnect(true) | |
end | |
end | |
# NOTE 1 | |
config = DaemonKit::Config.load('agent') | |
amq = ::MQ.new | |
# Support for split personalities | |
# NOTE 2 | |
identities = if config.identity.is_a?( Array ) | |
config.identity | |
else | |
[ config.identity ] | |
end | |
identities.each do |q| | |
DaemonKit.logger.debug("Connecting to queue #{q}") | |
# Durable queue name, with our own identity | |
cmdq = amq.queue( q, :durable => true ) | |
# Subscribe to the queue, with ack enabled. So if the daemon dies we | |
# can just get the message next time | |
cmdq.subscribe( :ack => true ) do |header,msg| | |
safely do | |
DaemonKit.logger.debug "Received workitem: #{msg.inspect}" | |
# NOTE 3 | |
Workitem.process( :amqp, msg ) | |
DaemonKit.logger.debug "Done processing workitem..." | |
header.ack | |
end | |
end | |
end | |
# Simple keep-alive ping | |
DaemonKit::Cron.scheduler.every("5m") do | |
identities.each do |i| | |
MQ.queue( 'remote-participant-status' ).publish( "#{i} OK" ) | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Configuration | |
# | |
amqp = OpenWFE::Extras::AMQPParticipant.new | |
# Map participant names to queues | |
amqp.map('jacob', 'foo') | |
amqp.map('solomon', 'bar') | |
engine.register_participant( 'jacob', amqp ) | |
engine.register_participant( 'solomon', amqp ) | |
engine.register_participant( 'nelson', OpenWFE::Extras::XMPPParticipant ) | |
# | |
# Sample process definition | |
# | |
class DaemonKitProcess < OpenWFE::ProcessDefinition | |
sequence do | |
# :activity is a nice to have | |
# :command is processed by remote daemon-kit, just like nanite: /class/method | |
# :queue is where to dispense the message to | |
jacob :activity => "Get random quote", :command => "/quote/random" | |
# :activity is a nice to have | |
# :command is processed by remote daemon-kit, just like nanite: /class/method | |
# :jid is where to dispense message to | |
nelson :activity => "Send random quote", :command => "/quote/send", :jid => '[email protected]' | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Workitem class to delegate instructions, this is very AMQP specific and would have to be adapted | |
# into something more XMPP/beanstalk friendly | |
# | |
# Responsible for processing the commands coming in over AMQP and | |
# delegating it to the correct class. | |
class Workitem | |
class << self | |
# = Process incoming commands via an AMQP queue | |
# | |
# Expects a JSON workitem from ruote that has these fields set in | |
# attributes key: | |
# | |
# { | |
# 'reply_queue' => 'queue to send replies to', | |
# 'params' => { | |
# 'command' => '/actor/method' | |
# } | |
# } | |
# | |
# == Notes on the command key: | |
# | |
# It looks like a resource, and will be treated as such. Is should | |
# be in the format of +/class/method+, and it will be passed the | |
# complete workitem as a hash. | |
# | |
# == Notes on replies | |
# | |
# Replies are sent back to the queue specified in the +reply_queue+ key. | |
# | |
# == Statusses | |
# | |
# Status messages are sent via topic exchanges, using the command | |
# as they routing key. | |
def process( work ) | |
# keep it singleton | |
@instance ||= new | |
work = JSON.parse( work ) | |
target, method = parse_command( work ) | |
response = target.send( method, work ) | |
# send fanout | |
send_reply( response ) | |
end | |
def parse_command( work ) | |
_, klass, method = work['attributes']['params']['command'].split('/') | |
@handlers ||= {} | |
unless @handlers.has_key?( klass ) | |
@handlers[ klass ] = klass.classify.constantize.new | |
end | |
return @handlers[ klass ], method | |
end | |
def send_reply( response ) | |
MQ.queue( response['attributes']['reply_queue'] ).publish( response.to_json ) | |
response | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment