Created
April 2, 2009 10:30
-
-
Save maccman/89126 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require 'fileutils' | |
| class Processor | |
| LOG_DIR = File.join(Rails.root, 'log') | |
| class Daemon | |
| PID_DIR = File.join(Rails.root, 'tmp', 'pids') | |
| class << self | |
| def daemonize!(name) | |
| fork do | |
| Process.setsid | |
| exit if fork | |
| store_pid(Process.pid, name) | |
| Dir.chdir File.dirname(__FILE__) | |
| File.umask 0000 | |
| STDIN.reopen "/dev/null" | |
| STDOUT.reopen "/dev/null", "a" | |
| STDERR.reopen STDOUT | |
| trap("TERM") { exit } | |
| yield if block_given? | |
| end | |
| end | |
| def kill!(name) | |
| Dir[pid_path(name)].each do |f| | |
| begin | |
| pid = IO.read(f).chomp.to_i | |
| FileUtils.rm f | |
| Process.kill(15, pid) # TERM | |
| puts "killed PID: #{pid}" | |
| rescue => e | |
| puts "Failed to kill! #{f}: #{e}" | |
| end | |
| end | |
| end | |
| def kill_all! | |
| kill!('*') | |
| end | |
| private | |
| def store_pid(pid, name) | |
| FileUtils.mkdir_p(PID_DIR) | |
| File.open(pid_path(name), 'w'){|f| | |
| f.write("#{pid}\n") | |
| } | |
| end | |
| def pid_path(queue_name) | |
| File.join(PID_DIR, "processor.#{queue_name}.pid") | |
| end | |
| end | |
| end | |
| class << self | |
| def all | |
| Processor.queue_manager.available_queues | |
| end | |
| def queue_name | |
| self.name.underscore | |
| end | |
| def append(*args) | |
| if !Rails.env.production? | |
| logger.info "Sending to #{queue_name}: #{args.inspect}" | |
| self.new.process(args) | |
| else | |
| logger.info "Adding to #{queue_name}: #{args.inspect}" | |
| queue << args | |
| end | |
| end | |
| alias :<< :append | |
| def start!(daemon = true) | |
| poller = Beanstalk::QueuePoller.new(Processor.queue_manager) | |
| block = lambda { | |
| poller.poll(queue_name.to_sym) {|msg| | |
| begin | |
| self.new.process(msg.ybody) | |
| rescue => e | |
| logger.error "\n#{ e.message } - (#{ e.class })\n" << | |
| "#{(e.backtrace or []).join("\n")}" | |
| # Hacky way to get message released again | |
| # raise Beanstalk::UnexpectedResponse | |
| end | |
| } | |
| } | |
| if daemon | |
| Daemon.daemonize!(queue_name, &block) | |
| else | |
| block.call | |
| end | |
| end | |
| def stop! | |
| Daemon.kill!(queue_name) | |
| end | |
| def logger | |
| return @logger if @logger | |
| FileUtils.mkdir_p(LOG_DIR) | |
| @logger = Logger.new(File.join(LOG_DIR, queue_name + '.log')) | |
| @logger | |
| end | |
| protected | |
| def queue_manager | |
| @@queue_manager ||= Beanstalk::QueueManager.load( | |
| File.join(Rails.root, 'config', 'beanstalk.yml') | |
| ) | |
| end | |
| private | |
| def queue | |
| Processor.queue_manager.queue(queue_name.to_sym) | |
| end | |
| end | |
| def reload! | |
| ActiveRecord::Base.verify_active_connections! if defined?(ActiveRecord) | |
| end | |
| def process(msg) | |
| reload! | |
| logger.info "Received on #{queue_name}: #{msg.inspect}" | |
| on_message(*msg) | |
| end | |
| def on_message(*args) | |
| raise 'You must implement on_message.' | |
| end | |
| def logger | |
| self.class.logger | |
| end | |
| def queue_name | |
| self.class.queue_name | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment