Created
June 22, 2009 16:31
-
-
Save kennethkalmer/134060 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# The code below is lifted from a daemon generated with daemon-kit. | |
# | |
# It is responsible for executing various commands on Linux servers to provision | |
# and manage hosting services for several hundred clients on each server. Messages | |
# are delivered to 'identity'-based queues, where each agent can tune into more than | |
# one queue depending on that services they offer. | |
# | |
# The original messages are JSON objects, representing workitems for the | |
# ruote workflow engine. There is some intermediary code that slices up the workitems | |
# to decide what actions to take. This one assumes we're just updating the disk usage | |
# cache for a directory. | |
# | |
# Generated amqp daemon | |
# Do your post daemonization configuration here | |
# At minimum you need just the first line (without the block), or a lot | |
# of strange things might start happening... | |
DaemonKit::Application.running! do |config| | |
# Trap signals with blocks or procs | |
# config.trap( 'INT' ) do | |
# # do something clever | |
# end | |
# config.trap( 'TERM', Proc.new { puts 'Going down' } ) | |
end | |
# IMPORTANT CONFIGURATION NOTE | |
# | |
# Please review and update 'config/amqp.yml' accordingly or this | |
# daemon won't work as advertised. | |
DaemonKit::Cron.scheduler.every("1m") do | |
DaemonKit.logger.debug "Timer loop" | |
end | |
# Run an event-loop for processing | |
DaemonKit::AMQP.run do | |
# Inside this block we're running inside the reactor setup by the | |
# amqp gem. Any code in the examples (from the gem) would work just | |
# fine here. | |
# Uncomment this for connection keep-alive | |
AMQP.conn.connection_status do |status| | |
DaemonKit.logger.debug("AMQP connection status changed: #{status}") | |
if status == :disconnected | |
AMQP.conn.reconnect(true) | |
end | |
end | |
config = DaemonKit::Config.load('agent') | |
amq = ::MQ.new | |
# Support for split personalities | |
identities = if config.identity.is_a?( Array ) | |
config.identity | |
else | |
[ config.identity ] | |
end | |
identities.each do |q| | |
DaemonKit.logger.debug("Connecting to queue #{q}") | |
# Durable queue name, with our own identity | |
cmdq = amq.queue( q, :durable => true ) | |
# Subscribe to the queue, with ack enabled. So if the daemon dies we | |
# can just get the message next time | |
cmdq.subscribe( :ack => true ) do |header,msg| | |
safely do | |
DaemonKit.logger.debug "Received message: #{msg.inspect}" | |
# For all practical reasons, this happens: | |
`du -sh #{msg}` =~ /(\d+)/ | |
usage = $1 || 0 | |
MQ.queue( 'boss' ).publish( "#{msg} #{usage}" ) | |
# The actual code is: | |
# Instruction.process( msg ) | |
DaemonKit.logger.debug "Done processing message..." | |
header.ack | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment