Skip to content

Instantly share code, notes, and snippets.

@Burgestrand
Created May 9, 2014 06:56
Show Gist options
  • Save Burgestrand/73528cb078ca3dab1810 to your computer and use it in GitHub Desktop.
Save Burgestrand/73528cb078ca3dab1810 to your computer and use it in GitHub Desktop.
Dispatcher
require "queue"
require "timeout"
module Spotify
class Dispatcher
class Work
class DoubleResultError < StandardError; end
def initialize(callable)
@callable = callable
@value_mutex = Mutex.new
@value_cond = ConditionVariable.new
@called = false
@result = nil
@result_type = nil
end
def call(*args, **kwargs, &block)
return_immediately = false
@value_mutex.synchronize do
if @result_type
return_immediately = true
else
@result_type = :pending
end
end
if return_immediately
return value
else
self.result = @callable.call(*args, **kwargs, &block)
end
rescue Exception => ex
self.error = ex
raise ex
end
def result=(result)
@value_mutex.synchronize do
raise DoubleResultError, "a #{@result_type} value exist" if @result_type
@result = result
@result_type = :result
@value_cond.broadcast
end
end
def error=(error)
@value_mutex.synchronize do
raise DoubleResultError, "a #{@result_type} value exist" if @result_type
@result = result
@result_type = :error
@value_cond.broadcast
end
end
def value(timeout = nil)
@value_mutex.synchronize do
@value_cond.wait(@value_mutex, timeout) unless value?
end
if @result_type == :result
return @result
elsif @result_type == :error
raise @result
elsif block_given?
yield
else
raise TimeoutError, "retrieving value timed out after #{timeout}s"
end
end
def value?
result_type = @result_type
result_type == :result || result_type == :error
end
end
def initialize
@queue = Queue.new
@worker = Thread.new(@queue) do |queue|
begin
while @running
work = queue.pop
work.call
end
rescue Exception => ex
Thread.current[:crashed] = ex
raise ex
end
end
end
def enqueue(timeout = nil, &block)
enqueue_async(&block).value(timeout)
end
def enqueue_async(&block)
queue = @queue
unless queue.nil?
work = Work.new(block)
queue << work
work
end
end
def shutdown(timeout = nil)
queue = @queue
@queue = nil
work = Work.new(lambda do
yield if block_given?
end)
queue << work
work.value(timeout)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment