Created
November 15, 2010 08:36
-
-
Save huacnlee/700167 to your computer and use it in GitHub Desktop.
Ruby 线程池类,用于控制同时最大允许多少个线程执行,并加入列队
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
class ThreadPool | |
class Worker | |
def initialize | |
@mutex = Mutex.new | |
@thread = Thread.new do | |
while true | |
sleep 0.001 | |
block = get_block | |
if block | |
block.call | |
reset_block | |
end | |
end | |
end | |
end | |
def get_block | |
@mutex.synchronize {@block} | |
end | |
def set_block(block) | |
@mutex.synchronize do | |
raise RuntimeError, "Thread already busy." if @block | |
@block = block | |
end | |
end | |
def reset_block | |
@mutex.synchronize {@block = nil} | |
end | |
def busy? | |
@mutex.synchronize {[email protected]?} | |
end | |
end | |
attr_accessor :max_size | |
attr_reader :workers | |
def initialize(max_size = 10) | |
@max_size = max_size | |
@workers = [] | |
@mutex = Mutex.new | |
end | |
def size | |
@mutex.synchronize {@workers.size} | |
end | |
def busy? | |
@mutex.synchronize {@workers.any? {|w| w.busy?}} | |
end | |
def join | |
sleep 0.01 while busy? | |
end | |
def process(&block) | |
while true | |
@mutex.synchronize do | |
worker = find_available_worker | |
if worker | |
return worker.set_block(block) | |
end | |
end | |
sleep 0.01 | |
end | |
end | |
def wait_for_worker | |
while true | |
worker = find_available_worker | |
return worker if worker | |
sleep 0.01 | |
end | |
end | |
def find_available_worker | |
free_worker || create_worker | |
end | |
def free_worker | |
@workers.each {|w| return w unless w.busy?}; nil | |
end | |
def create_worker | |
return nil if @workers.size >= @max_size | |
worker = Worker.new | |
@workers << worker | |
worker | |
end | |
end |
这个有 Bug, 线程会堵塞
...有比较好的gems 吗?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
例如:
pool = ThreadPool.new(10) # 最大10个线程
30.times do |i|
pool.process {
puts "#{i} running..."
sleep(rand(10))
puts "#{i} done."
}
end