Skip to content

Instantly share code, notes, and snippets.

@stouset
Created August 28, 2009 20:42
Show Gist options
  • Save stouset/177223 to your computer and use it in GitHub Desktop.
Save stouset/177223 to your computer and use it in GitHub Desktop.
class ThreadPool
attr_reader :size
attr_reader :queue
Job = Struct.new(:args, :block)
#
# Creates a new ThreadPool with +size+ threads, waiting at your beck and
# call.
#
def initialize(size)
self.size = size
self.queue = Queue.new
# create a pool of threads the size requested
self.pool = Array.new(size) { thread }
end
#
# Schedules a +job+ to be run with the given +args+. Returns immediately.
#
def schedule(*args, &job)
self.queue.push Job.new(args, job)
end
#
# Schedules a +job+ to be run with the given +args+. Returns only once the
# job has begun execution on a thread in the pool.
#
def execute(*args, &job)
current = Thread.current
mutex = Mutex.new
mutex.synchronize do
schedule do
mutex.synchronize { current.run }
schedule(*args, &job)
end
Thread.stop
end
end
#
# Cleans up after the ThreadPool, quitting and joining all remaining
# threads. No more jobs can be run in the pool once this method has been
# called.
#
def shutdown
self.size.times { schedule { Thread.exit } }
self.pool.each {|thread| thread.join }
end
protected
attr_writer :size
attr_writer :queue
attr_accessor :pool
#
# Instantiates a new thread. Each thread simply waits until there's a job
# on the queue, runs it, then returns back to the queue for more work. If
# the queue is empty, the thread sleeps until there is an available job.
#
def thread
Thread.new do
loop do
job = queue.pop
job.block[*job.args]
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment