Created
May 9, 2014 06:56
-
-
Save Burgestrand/73528cb078ca3dab1810 to your computer and use it in GitHub Desktop.
Dispatcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "queue" | |
require "timeout" | |
module Spotify | |
class Dispatcher | |
class Work | |
class DoubleResultError < StandardError; end | |
def initialize(callable) | |
@callable = callable | |
@value_mutex = Mutex.new | |
@value_cond = ConditionVariable.new | |
@called = false | |
@result = nil | |
@result_type = nil | |
end | |
def call(*args, **kwargs, &block) | |
return_immediately = false | |
@value_mutex.synchronize do | |
if @result_type | |
return_immediately = true | |
else | |
@result_type = :pending | |
end | |
end | |
if return_immediately | |
return value | |
else | |
self.result = @callable.call(*args, **kwargs, &block) | |
end | |
rescue Exception => ex | |
self.error = ex | |
raise ex | |
end | |
def result=(result) | |
@value_mutex.synchronize do | |
raise DoubleResultError, "a #{@result_type} value exist" if @result_type | |
@result = result | |
@result_type = :result | |
@value_cond.broadcast | |
end | |
end | |
def error=(error) | |
@value_mutex.synchronize do | |
raise DoubleResultError, "a #{@result_type} value exist" if @result_type | |
@result = result | |
@result_type = :error | |
@value_cond.broadcast | |
end | |
end | |
def value(timeout = nil) | |
@value_mutex.synchronize do | |
@value_cond.wait(@value_mutex, timeout) unless value? | |
end | |
if @result_type == :result | |
return @result | |
elsif @result_type == :error | |
raise @result | |
elsif block_given? | |
yield | |
else | |
raise TimeoutError, "retrieving value timed out after #{timeout}s" | |
end | |
end | |
def value? | |
result_type = @result_type | |
result_type == :result || result_type == :error | |
end | |
end | |
def initialize | |
@queue = Queue.new | |
@worker = Thread.new(@queue) do |queue| | |
begin | |
while @running | |
work = queue.pop | |
work.call | |
end | |
rescue Exception => ex | |
Thread.current[:crashed] = ex | |
raise ex | |
end | |
end | |
end | |
def enqueue(timeout = nil, &block) | |
enqueue_async(&block).value(timeout) | |
end | |
def enqueue_async(&block) | |
queue = @queue | |
unless queue.nil? | |
work = Work.new(block) | |
queue << work | |
work | |
end | |
end | |
def shutdown(timeout = nil) | |
queue = @queue | |
@queue = nil | |
work = Work.new(lambda do | |
yield if block_given? | |
end) | |
queue << work | |
work.value(timeout) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment