Created
September 14, 2012 20:17
-
-
Save jeremy/3724459 to your computer and use it in GitHub Desktop.
queue consumer exception handler and refactoring
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit 63a12f72df1a7507f7a1923bdaa56b56b876b562 | |
Author: Jeremy Kemper <[email protected]> | |
Date: Fri Sep 14 17:00:46 2012 -0700 | |
Pass an exception_handler to queue consumers. Don't run jobs in synchronous & test queues; delegate to a consumer. | |
diff --git a/activesupport/lib/active_support/queueing.rb b/activesupport/lib/active_support/queueing.rb | |
index f397e1c..c4ff8b8 100644 | |
--- a/activesupport/lib/active_support/queueing.rb | |
+++ b/activesupport/lib/active_support/queueing.rb | |
@@ -2,17 +2,33 @@ require 'delegate' | |
require 'thread' | |
module ActiveSupport | |
- # A Queue that simply inherits from STDLIB's Queue. Everytime this | |
- # queue is used, Rails automatically sets up a ThreadedConsumer | |
- # to consume it. | |
+ # A Queue that simply inherits from STDLIB's Queue. When this | |
+ # queue is used, Rails automatically starts a job runner in a | |
+ # background thread. | |
class Queue < ::Queue | |
+ attr_writer :consumer | |
+ | |
+ def initialize(consumer_options = {}) | |
+ super() | |
+ @consumer_options = consumer_options | |
+ end | |
+ | |
+ def consumer | |
+ @consumer ||= ThreadedQueueConsumer.new(self, @consumer_options) | |
+ end | |
+ | |
+ # Drain the queue, running all jobs in a different thread. This method | |
+ # may not be available on production queues. | |
+ def drain | |
+ # run the jobs in a separate thread so assumptions of synchronous | |
+ # jobs are caught in test mode. | |
+ consumer.drain | |
+ end | |
end | |
- class SynchronousQueue < ::Queue | |
+ class SynchronousQueue < Queue | |
def push(job) | |
- result = nil | |
- Thread.new { result = job.run }.join | |
- result | |
+ super.tap &:drain | |
end | |
alias << push | |
alias enq push | |
@@ -25,7 +41,7 @@ module ActiveSupport | |
# | |
# Jobs are run in a separate thread to catch mistakes where code | |
# assumes that the job is run in the same thread. | |
- class TestQueue < ::Queue | |
+ class TestQueue < Queue | |
# Get a list of the jobs off this queue. This method may not be | |
# available on production queues. | |
def jobs | |
@@ -38,14 +54,6 @@ module ActiveSupport | |
def push(job) | |
super Marshal.load(Marshal.dump(job)) | |
end | |
- | |
- # Drain the queue, running all jobs in a different thread. This method | |
- # may not be available on production queues. | |
- def drain | |
- # run the jobs in a separate thread so assumptions of synchronous | |
- # jobs are caught in test mode. | |
- Thread.new { pop.run until empty? }.join | |
- end | |
end | |
# A container for multiple queues. This class delegates to a default Queue | |
@@ -82,25 +90,21 @@ module ActiveSupport | |
# queue and joins the thread, which will ensure that all jobs | |
# are executed before the process finally dies. | |
class ThreadedQueueConsumer | |
- def self.start(queue, logger=nil) | |
- new(queue, logger).start | |
+ attr_accessor :exception_handler | |
+ | |
+ def self.start(*args) | |
+ new(*args).start | |
end | |
- def initialize(queue, logger=nil) | |
- @queue = queue | |
- @logger = logger | |
+ def initialize(queue, options = {}) | |
+ @queue = queue | |
+ @logger = options[:logger] | |
+ @exception_handler = options[:exception_handler] | |
+ @exception_handler ||= method(:log_exception) if @logger | |
end | |
def start | |
- @thread = Thread.new do | |
- while job = @queue.pop | |
- begin | |
- job.run | |
- rescue Exception => e | |
- handle_exception e | |
- end | |
- end | |
- end | |
+ @thread ||= Thread.new { consume } | |
self | |
end | |
@@ -109,8 +113,25 @@ module ActiveSupport | |
@thread.join | |
end | |
- def handle_exception(e) | |
- @logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}" if @logger | |
+ def drain | |
+ start | |
+ shutdown | |
+ end | |
+ | |
+ def consume | |
+ while job = @queue.pop | |
+ run job | |
+ end | |
+ end | |
+ | |
+ def run(job) | |
+ job.run | |
+ rescue Exception => exception | |
+ @exception_handler ? @exception_handler.(job, exception) : raise | |
+ end | |
+ | |
+ def log_exception(job, exception) | |
+ @logger.error "Job Error: #{exception.message}\n#{exception.backtrace.join("\n")}" if @logger | |
end | |
end | |
end | |
diff --git a/activesupport/test/queueing/threaded_consumer_test.rb b/activesupport/test/queueing/threaded_consumer_test.rb | |
index 20a1cc4..0f2bf10 100644 | |
--- a/activesupport/test/queueing/threaded_consumer_test.rb | |
+++ b/activesupport/test/queueing/threaded_consumer_test.rb | |
@@ -5,7 +5,7 @@ require "active_support/log_subscriber/test_helper" | |
class TestThreadConsumer < ActiveSupport::TestCase | |
class Job | |
attr_reader :id | |
- def initialize(id, &block) | |
+ def initialize(id = 1, &block) | |
@id = id | |
@block = block | |
end | |
@@ -16,83 +16,71 @@ class TestThreadConsumer < ActiveSupport::TestCase | |
end | |
def setup | |
- @queue = ActiveSupport::Queue.new | |
@logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new | |
- @consumer = ActiveSupport::ThreadedQueueConsumer.start(@queue, @logger) | |
+ @queue = ActiveSupport::Queue.new(logger: @logger) | |
end | |
def teardown | |
- @queue.push nil | |
+ @queue.drain | |
end | |
test "the jobs are executed" do | |
ran = false | |
- | |
- job = Job.new(1) do | |
- ran = true | |
- end | |
+ job = Job.new { ran = true } | |
@queue.push job | |
- sleep 0.1 | |
+ @queue.drain | |
+ | |
assert_equal true, ran | |
end | |
test "the jobs are not executed synchronously" do | |
- ran = false | |
- | |
- job = Job.new(1) do | |
- sleep 0.1 | |
- ran = true | |
- end | |
+ run, ran = Queue.new, Queue.new | |
+ job = Job.new { ran.push run.pop } | |
+ @queue.consumer.start | |
@queue.push job | |
- assert_equal false, ran | |
+ assert ran.empty? | |
+ | |
+ run.push true | |
+ assert_equal true, ran.pop | |
end | |
test "shutting down the queue synchronously drains the jobs" do | |
+ runnable = ::Queue.new | |
ran = false | |
- | |
- job = Job.new(1) do | |
+ job = Job.new do | |
sleep 0.1 | |
ran = true | |
end | |
+ @queue.consumer.start | |
@queue.push job | |
assert_equal false, ran | |
- @consumer.shutdown | |
- | |
+ @queue.consumer.shutdown | |
assert_equal true, ran | |
end | |
test "log job that raises an exception" do | |
- job = Job.new(1) do | |
- raise "RuntimeError: Error!" | |
- end | |
+ job = Job.new { raise "RuntimeError: Error!" } | |
@queue.push job | |
- sleep 0.1 | |
+ @queue.drain | |
assert_equal 1, @logger.logged(:error).size | |
- assert_match(/Job Error: RuntimeError: Error!/, @logger.logged(:error).last) | |
+ assert_match 'Job Error: RuntimeError: Error!', @logger.logged(:error).last | |
end | |
test "test overriding exception handling" do | |
- @consumer.shutdown | |
- @consumer = Class.new(ActiveSupport::ThreadedQueueConsumer) do | |
- attr_reader :last_error | |
- def handle_exception(e) | |
- @last_error = e.message | |
- end | |
- end.start(@queue) | |
- | |
- job = Job.new(1) do | |
- raise "RuntimeError: Error!" | |
- end | |
+ last_error = nil | |
+ @queue.consumer.exception_handler = ->(job, exception) { last_error = exception.message } | |
+ | |
+ job = Job.new { raise "RuntimeError: Error!" } | |
@queue.push job | |
- sleep 0.1 | |
+ @queue.drain | |
- assert_equal "RuntimeError: Error!", @consumer.last_error | |
+ assert_equal "RuntimeError: Error!", last_error | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment