Skip to content

Instantly share code, notes, and snippets.

@raggi
Created September 15, 2011 23:34
Show Gist options
  • Save raggi/1220800 to your computer and use it in GitHub Desktop.
Save raggi/1220800 to your computer and use it in GitHub Desktop.
An example race condition using fibers
## SCROLL DOWN FOR THE RACE, THE FOLLOWING IS JUST UTILITY FROM COMMONLY USED CODE.
# Author:: Mohammad A. Ali (mailto:[email protected])
# Copyright:: Copyright (c) 2008 eSpace, Inc.
# License:: Distributes under the same terms as Ruby
require 'fiber'
class Fiber
#Attribute Reference--Returns the value of a fiber-local variable, using
#either a symbol or a string name. If the specified variable does not exist,
#returns nil.
def [](key)
local_fiber_variables[key]
end
#Attribute Assignment--Sets or creates the value of a fiber-local variable,
#using either a symbol or a string. See also Fiber#[].
def []=(key,value)
local_fiber_variables[key] = value
end
private
def local_fiber_variables
@local_fiber_variables ||= {}
end
end
class FiberPool
# gives access to the currently free fibers
attr_reader :fibers
attr_reader :busy_fibers
# Code can register a proc with this FiberPool to be called
# every time a Fiber is finished. Good for releasing resources
# like ActiveRecord database connections.
attr_accessor :generic_callbacks
# Prepare a list of fibers that are able to run different blocks of code
# every time. Once a fiber is done with its block, it attempts to fetch
# another one from the queue
def initialize(count = 100)
@fibers,@busy_fibers,@queue,@generic_callbacks = [],{},[],[]
count.times do |i|
fiber = Fiber.new do |block|
loop do
block.call
# callbacks are called in a reverse order, much like c++ destructor
Fiber.current[:callbacks].pop.call while Fiber.current[:callbacks].length > 0
generic_callbacks.each do |cb|
cb.call
end
unless @queue.empty?
block = @queue.shift
else
@busy_fibers.delete(Fiber.current.object_id)
@fibers.unshift Fiber.current
block = Fiber.yield
end
end
end
fiber[:callbacks] = []
fiber[:em_keys] = []
@fibers << fiber
end
end
# If there is an available fiber use it, otherwise, leave it to linger
# in a queue
def spawn(&block)
if fiber = @fibers.shift
fiber[:callbacks] = []
@busy_fibers[fiber.object_id] = fiber
fiber.resume(block)
else
@queue << block
end
self # we are keen on hiding our queue
end
end
class Job
attr_reader :id
def initialize id
@id = id
@steps = [:start, :load_value, :business_logic, :store_value, :finish]
POOL.spawn do
run
end
end
def run
until done?
step = @steps.shift
p [@id, step]
send step
end
end
def start
do_io
end
def load_value
@value = DB.value
do_io
end
def business_logic
@value = @value + 1
do_io
end
def store_value
DB.value = @value
end
def finish
end
def done?
@steps.empty?
end
private
def do_io
IOLOOP.wait(Fiber.current) # would normally pass some resource here to wait against
Fiber.yield
end
end
class Io
def initialize(avg_sleep_time)
@avg_sleep_time = avg_sleep_time
@waiters = []
end
def wait(fiber)
@waiters << fiber
end
def run
sleep rand * @avg_sleep_time
# Lets say anywhere between 0 and 30% of the "IO" completes in this "tick"
dones = @waiters.sort_by{rand}[0..rand(@waiters.size * 0.3)]
@waiters -= dones
dones.each { |fiber| fiber.resume }
end
end
POOL = FiberPool.new
DB = Struct.new(:value).new
DB.value = 10
IOLOOP = Io.new(0.01)
jobs = Array.new(100) do |id|
Job.new(id)
end
IOLOOP.run until jobs.all? { |job| job.done? }
puts "The following value should be 110: #{DB.value}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment