Skip to content

Instantly share code, notes, and snippets.

@maccman
Created April 2, 2009 10:30
Show Gist options
  • Select an option

  • Save maccman/89126 to your computer and use it in GitHub Desktop.

Select an option

Save maccman/89126 to your computer and use it in GitHub Desktop.
require 'fileutils'
class Processor
LOG_DIR = File.join(Rails.root, 'log')
class Daemon
PID_DIR = File.join(Rails.root, 'tmp', 'pids')
class << self
def daemonize!(name)
fork do
Process.setsid
exit if fork
store_pid(Process.pid, name)
Dir.chdir File.dirname(__FILE__)
File.umask 0000
STDIN.reopen "/dev/null"
STDOUT.reopen "/dev/null", "a"
STDERR.reopen STDOUT
trap("TERM") { exit }
yield if block_given?
end
end
def kill!(name)
Dir[pid_path(name)].each do |f|
begin
pid = IO.read(f).chomp.to_i
FileUtils.rm f
Process.kill(15, pid) # TERM
puts "killed PID: #{pid}"
rescue => e
puts "Failed to kill! #{f}: #{e}"
end
end
end
def kill_all!
kill!('*')
end
private
def store_pid(pid, name)
FileUtils.mkdir_p(PID_DIR)
File.open(pid_path(name), 'w'){|f|
f.write("#{pid}\n")
}
end
def pid_path(queue_name)
File.join(PID_DIR, "processor.#{queue_name}.pid")
end
end
end
class << self
def all
Processor.queue_manager.available_queues
end
def queue_name
self.name.underscore
end
def append(*args)
if !Rails.env.production?
logger.info "Sending to #{queue_name}: #{args.inspect}"
self.new.process(args)
else
logger.info "Adding to #{queue_name}: #{args.inspect}"
queue << args
end
end
alias :<< :append
def start!(daemon = true)
poller = Beanstalk::QueuePoller.new(Processor.queue_manager)
block = lambda {
poller.poll(queue_name.to_sym) {|msg|
begin
self.new.process(msg.ybody)
rescue => e
logger.error "\n#{ e.message } - (#{ e.class })\n" <<
"#{(e.backtrace or []).join("\n")}"
# Hacky way to get message released again
# raise Beanstalk::UnexpectedResponse
end
}
}
if daemon
Daemon.daemonize!(queue_name, &block)
else
block.call
end
end
def stop!
Daemon.kill!(queue_name)
end
def logger
return @logger if @logger
FileUtils.mkdir_p(LOG_DIR)
@logger = Logger.new(File.join(LOG_DIR, queue_name + '.log'))
@logger
end
protected
def queue_manager
@@queue_manager ||= Beanstalk::QueueManager.load(
File.join(Rails.root, 'config', 'beanstalk.yml')
)
end
private
def queue
Processor.queue_manager.queue(queue_name.to_sym)
end
end
def reload!
ActiveRecord::Base.verify_active_connections! if defined?(ActiveRecord)
end
def process(msg)
reload!
logger.info "Received on #{queue_name}: #{msg.inspect}"
on_message(*msg)
end
def on_message(*args)
raise 'You must implement on_message.'
end
def logger
self.class.logger
end
def queue_name
self.class.queue_name
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment