Created
January 4, 2016 12:49
-
-
Save kaichen/16e4db8115455b609231 to your computer and use it in GitHub Desktop.
简单多线程调度
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "thread" | |
require "set" | |
Thread.abort_on_exception = true | |
# 调度器 | |
class Manager | |
def initialize | |
@job_queue = JobQueue.new | |
@workers = Set.new | |
10.times { @workers.add Worker.new(self) } | |
end | |
def enqueue_job(str) | |
@job_queue.enqueue(str) | |
end | |
def requeue_job(str) | |
@job_queue.unshift(str) | |
end | |
def dequeue_job | |
@job_queue.dequeue | |
end | |
def start | |
@workers.each{|w| w.start } | |
puts "All started!" | |
end | |
def stop | |
@workers.each{|w| w.stop} | |
puts "All stopped!" | |
end | |
end | |
# 工作线程 | |
class Worker | |
def initialize(mgr) | |
@mgr = mgr | |
@done = false | |
end | |
def start | |
Thread.new do | |
while !@done && job = @mgr.dequeue_job | |
unless @done | |
@mgr.requeue_job job | |
end | |
puts "Processing #{job}" | |
heavy_work | |
puts "Processed #{job}" | |
end | |
end | |
end | |
def stop | |
@done = true | |
end | |
def heavy_work | |
sleep 5 | |
end | |
end | |
# 任务队列(阻塞) | |
class JobQueue | |
def initialize | |
@jobs = [] | |
@mutex = Mutex.new | |
@condvar = ConditionVariable.new | |
end | |
def dequeue | |
@mutex.synchronize { | |
while @jobs.empty? | |
@condvar.wait(@mutex) | |
end | |
@jobs.shift | |
} | |
end | |
def enqueue(job) | |
@mutex.synchronize do | |
@jobs.push job | |
@condvar.signal | |
end | |
end | |
def requeue(job) | |
@mutex.synchronize do | |
@jobs.unshift job | |
@condvar.signal | |
end | |
end | |
end | |
if __FILE__==$0 | |
mgr = Manager.new | |
mgr.start | |
puts "Now enqueue 10 jobs" | |
10.times{|i| mgr.enqueue_job("job##{i}") } | |
puts "Now enqueue 25 jobs" | |
25.times{|i| mgr.enqueue_job("job##{i+10}") } | |
puts "Now we stop all workers then enqueue new job" | |
mgr.stop | |
mgr.enqueue_job("job#x") | |
end |
zhenwusw
commented
Jan 4, 2016
给主线程加个足够的 sleep, 才能看到 worker 工作的输出,否则主线程退出,其他的也退出了
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment