Created
September 15, 2011 23:34
-
-
Save raggi/1220800 to your computer and use it in GitHub Desktop.
An example race condition using fibers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## SCROLL DOWN FOR THE RACE, THE FOLLOWING IS JUST UTILITY FROM COMMONLY USED CODE. | |
# Author:: Mohammad A. Ali (mailto:[email protected]) | |
# Copyright:: Copyright (c) 2008 eSpace, Inc. | |
# License:: Distributes under the same terms as Ruby | |
require 'fiber' | |
class Fiber | |
#Attribute Reference--Returns the value of a fiber-local variable, using | |
#either a symbol or a string name. If the specified variable does not exist, | |
#returns nil. | |
def [](key) | |
local_fiber_variables[key] | |
end | |
#Attribute Assignment--Sets or creates the value of a fiber-local variable, | |
#using either a symbol or a string. See also Fiber#[]. | |
def []=(key,value) | |
local_fiber_variables[key] = value | |
end | |
private | |
def local_fiber_variables | |
@local_fiber_variables ||= {} | |
end | |
end | |
class FiberPool | |
# gives access to the currently free fibers | |
attr_reader :fibers | |
attr_reader :busy_fibers | |
# Code can register a proc with this FiberPool to be called | |
# every time a Fiber is finished. Good for releasing resources | |
# like ActiveRecord database connections. | |
attr_accessor :generic_callbacks | |
# Prepare a list of fibers that are able to run different blocks of code | |
# every time. Once a fiber is done with its block, it attempts to fetch | |
# another one from the queue | |
def initialize(count = 100) | |
@fibers,@busy_fibers,@queue,@generic_callbacks = [],{},[],[] | |
count.times do |i| | |
fiber = Fiber.new do |block| | |
loop do | |
block.call | |
# callbacks are called in a reverse order, much like c++ destructor | |
Fiber.current[:callbacks].pop.call while Fiber.current[:callbacks].length > 0 | |
generic_callbacks.each do |cb| | |
cb.call | |
end | |
unless @queue.empty? | |
block = @queue.shift | |
else | |
@busy_fibers.delete(Fiber.current.object_id) | |
@fibers.unshift Fiber.current | |
block = Fiber.yield | |
end | |
end | |
end | |
fiber[:callbacks] = [] | |
fiber[:em_keys] = [] | |
@fibers << fiber | |
end | |
end | |
# If there is an available fiber use it, otherwise, leave it to linger | |
# in a queue | |
def spawn(&block) | |
if fiber = @fibers.shift | |
fiber[:callbacks] = [] | |
@busy_fibers[fiber.object_id] = fiber | |
fiber.resume(block) | |
else | |
@queue << block | |
end | |
self # we are keen on hiding our queue | |
end | |
end | |
class Job | |
attr_reader :id | |
def initialize id | |
@id = id | |
@steps = [:start, :load_value, :business_logic, :store_value, :finish] | |
POOL.spawn do | |
run | |
end | |
end | |
def run | |
until done? | |
step = @steps.shift | |
p [@id, step] | |
send step | |
end | |
end | |
def start | |
do_io | |
end | |
def load_value | |
@value = DB.value | |
do_io | |
end | |
def business_logic | |
@value = @value + 1 | |
do_io | |
end | |
def store_value | |
DB.value = @value | |
end | |
def finish | |
end | |
def done? | |
@steps.empty? | |
end | |
private | |
def do_io | |
IOLOOP.wait(Fiber.current) # would normally pass some resource here to wait against | |
Fiber.yield | |
end | |
end | |
class Io | |
def initialize(avg_sleep_time) | |
@avg_sleep_time = avg_sleep_time | |
@waiters = [] | |
end | |
def wait(fiber) | |
@waiters << fiber | |
end | |
def run | |
sleep rand * @avg_sleep_time | |
# Lets say anywhere between 0 and 30% of the "IO" completes in this "tick" | |
dones = @waiters.sort_by{rand}[0..rand(@waiters.size * 0.3)] | |
@waiters -= dones | |
dones.each { |fiber| fiber.resume } | |
end | |
end | |
POOL = FiberPool.new | |
DB = Struct.new(:value).new | |
DB.value = 10 | |
IOLOOP = Io.new(0.01) | |
jobs = Array.new(100) do |id| | |
Job.new(id) | |
end | |
IOLOOP.run until jobs.all? { |job| job.done? } | |
puts "The following value should be 110: #{DB.value}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment