Skip to content

Instantly share code, notes, and snippets.

@nicksieger
Created July 15, 2010 00:20
Show Gist options
  • Save nicksieger/476326 to your computer and use it in GitHub Desktop.
Save nicksieger/476326 to your computer and use it in GitHub Desktop.

Synchronizing a Pool of Resources

Here are several approaches for synchronization around a pool of limited resources that are checked in and out by a gang of threads.

Pools

There are three different pools: MonitorPool which uses monitor.rb, MutexCondvarPool which uses a Mutex and a ConditionVariable, and SemaphorePool which uses a java.util.concurrent.Semaphore (only under JRuby).

Request

The Request class simply runs in a thread, tries to acquire a resource from the pool, keeps trying until it has held it at least once, sleeps for a busy-wait moment, and then checks it back into the pool.

Usage

$ ruby pool.rb [M] [N] [TIMEOUT] [BUSY]

where M is the number of resources, N is the number of threads contending for the resources, TIMEOUT is the duration (in seconds) a thread will wait for a resource before retrying, and BUSY is the duration (in seconds) a thread will spend holding onto a resource.

TIMEOUT can be "inf" meaning don't timeout. BUSY can be "rand" meaning a random busy-wait between 0 and 1 seconds.

Specify DEBUG=1 ruby pool.rb to see each request acquire and release its resource.

require 'monitor'
require 'thread'
M = (ARGV[0] || ENV['M'] || "5").to_i
N = (ARGV[1] || ENV['N'] || "10").to_i
TIMEOUT = begin
t = (ARGV[2] || ENV['TIMEOUT'] || '0.5').to_f
t = nil if t == 0.0
t
end
BUSY = begin
b = (ARGV[3] || ENV['BUSY'] || '0.6').to_f
b = nil if b == 0.0
b
end
module Pools
Resource = Struct.new(:id)
class BasePool
def initialize
@resources = (1..M).map{|n| Resource.new(n)}
@checked_out = []
@timeout = TIMEOUT
end
end
class MonitorPool < BasePool
def initialize
super
@mutex = Monitor.new
@queue = @mutex.new_cond
end
def checkout
@mutex.synchronize do
loop do
resource = (@resources - @checked_out).first
if resource
@checked_out << resource
return resource
end
if @queue.wait(@timeout)
next
else
raise "could not obtain a resource#{" within #{@timeout} seconds" if @timeout}"
end
end
end
end
def checkin(resource)
@mutex.synchronize do
@checked_out.delete resource
@queue.signal
end
end
end
class MutexCondvarPool < MonitorPool
class ConditionVariableWithTimeout
class Timeout < Exception; end
def initialize(mutex)
@mutex = mutex
@cond = ConditionVariable.new
end
def signal
@cond.signal
end
if ConditionVariable.instance_method("wait").arity == 1
# MRI 1.8.x
def wait(timeout)
begin
timer = create_timer(timeout)
@cond.wait(@mutex)
Thread.kill(timer) if timer && timer.alive?
true
rescue Timeout
nil
end
end
private
def create_timer(timeout)
if timeout
waiter = Thread.current
return Thread.new {
Thread.pass
sleep(timeout)
waiter.raise(Timeout.new)
}
else
return nil
end
end
else
# JRuby, 1.9.2
def wait(timeout)
@cond.wait(@mutex, timeout)
end
end
end
def initialize
super
@mutex = Mutex.new
@queue = ConditionVariableWithTimeout.new(@mutex)
end
end
class SemaphorePool < BasePool
require 'java'
def initialize
super
@timeout = (@timeout * 1000).to_i if @timeout
@time_unit = java.util.concurrent.TimeUnit::MILLISECONDS
@mutex = java.lang.Object.new
@queue = java.util.concurrent.Semaphore.new(@resources.length)
end
def checkout
if @timeout
unless @queue.try_acquire(@timeout, @time_unit)
raise "could not obtain a resource#{" within #{@timeout} seconds" if @timeout}"
end
else
@queue.acquire
end
resource = (@resources - @checked_out).first
@mutex.synchronized do
@checked_out << resource
end
resource
end
def checkin(resource)
@mutex.synchronized do
@checked_out.delete resource
end
@queue.release
end
end if defined?(JRUBY_VERSION)
end
class Request < Pools::Resource
attr_reader :failures
def initialize(id, pool)
super(id)
@pool = pool
@failures = 0
end
def acquire
report "entering"
begin
res = @pool.checkout
report "acquired #{res.inspect}"
busy_wait
@pool.checkin res
rescue
report "failed to acquire, retrying"
@failures += 1
Thread.pass
retry
end
report "exiting"
end
def busy_wait
if BUSY
sleep BUSY
else
sleep rand
end
end
def report(msg)
puts "#{self.inspect}: #{msg}" if ENV['DEBUG']
end
end
require 'benchmark'
def run_checkouts(pool)
threads = []
timeouts = 0
N.times do |i|
t = Thread.new do
r = Request.new(i, pool)
r.acquire
timeouts += r.failures
end
threads << t
end
threads.each {|t| t.join}
puts "#{timeouts} timeout#{timeouts != 1 ? 's' : ''} occurred"
end
puts "Running #{N} threads against #{M} resources with #{TIMEOUT ? TIMEOUT : 'infinite'} timeout and #{BUSY ? BUSY : 'random'} busy-wait"
puts "Monitor Pool"
puts Benchmark.measure {
run_checkouts Pools::MonitorPool.new
}
puts "Mutex+ConditionVariable Pool"
puts Benchmark.measure {
run_checkouts Pools::MutexCondvarPool.new
}
if defined?(JRUBY_VERSION)
puts "Semaphore Pool"
puts Benchmark.measure {
run_checkouts Pools::SemaphorePool.new
}
end
$ ruby -v pool.rb 5 30
ruby 1.8.7 (2009-06-12 patchlevel 174) [universal-darwin10.0]
Running 30 threads against 5 resources with 0.5 timeout and 0.6 busy-wait
Monitor Pool
75 timeouts occurred
0.030000 0.010000 0.040000 ( 3.604197)
Mutex+ConditionVariable Pool
75 timeouts occurred
0.030000 0.010000 0.040000 ( 3.605272)
$ ruby-head -v pool.rb 5 30
ruby 1.9.3dev (2010-07-12 trunk 28620) [x86_64-darwin10.4.0]
Running 30 threads against 5 resources with 0.5 timeout and 0.6 busy-wait
Monitor Pool
0 timeouts occurred
0.010000 0.010000 0.020000 ( 3.610159)
Mutex+ConditionVariable Pool
0 timeouts occurred
0.010000 0.010000 0.020000 ( 3.604341)
$ rbx-1.0.0-20100514 -v pool.rb 5 30
rubinius 1.0.0 (1.8.7 release 2010-05-14 JI) [x86_64-apple-darwin10.3.0]
Running 30 threads against 5 resources with 0.5 timeout and 0.6 busy-wait
Monitor Pool
75 timeouts occurred
3.610961 0.000000 3.610961 ( 3.610910)
Mutex+ConditionVariable Pool
75 timeouts occurred
3.610875 0.000000 3.610875 ( 3.610873)
$ jruby -v pool.rb 5 30
jruby 1.6.0.dev (ruby 1.8.7 patchlevel 249) (2010-07-14 5e3ddc7) (Java HotSpot(TM) Client VM 1.6.0_20) [i386-java]
Running 30 threads against 5 resources with 0.5 timeout and 0.6 busy-wait
Monitor Pool
53 timeouts occurred
3.688000 0.000000 3.688000 ( 3.646000)
Mutex+ConditionVariable Pool
44 timeouts occurred
3.619000 0.000000 3.619000 ( 3.620000)
Semaphore Pool
72 timeouts occurred
3.638000 0.000000 3.638000 ( 3.639000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment