Skip to content

Instantly share code, notes, and snippets.

@kaichen
Created January 4, 2016 12:49
Show Gist options
  • Save kaichen/16e4db8115455b609231 to your computer and use it in GitHub Desktop.
Save kaichen/16e4db8115455b609231 to your computer and use it in GitHub Desktop.
简单多线程调度
require "thread"
require "set"
Thread.abort_on_exception = true
# 调度器
class Manager
def initialize
@job_queue = JobQueue.new
@workers = Set.new
10.times { @workers.add Worker.new(self) }
end
def enqueue_job(str)
@job_queue.enqueue(str)
end
def requeue_job(str)
@job_queue.unshift(str)
end
def dequeue_job
@job_queue.dequeue
end
def start
@workers.each{|w| w.start }
puts "All started!"
end
def stop
@workers.each{|w| w.stop}
puts "All stopped!"
end
end
# 工作线程
class Worker
def initialize(mgr)
@mgr = mgr
@done = false
end
def start
Thread.new do
while !@done && job = @mgr.dequeue_job
unless @done
@mgr.requeue_job job
end
puts "Processing #{job}"
heavy_work
puts "Processed #{job}"
end
end
end
def stop
@done = true
end
def heavy_work
sleep 5
end
end
# 任务队列(阻塞)
class JobQueue
def initialize
@jobs = []
@mutex = Mutex.new
@condvar = ConditionVariable.new
end
def dequeue
@mutex.synchronize {
while @jobs.empty?
@condvar.wait(@mutex)
end
@jobs.shift
}
end
def enqueue(job)
@mutex.synchronize do
@jobs.push job
@condvar.signal
end
end
def requeue(job)
@mutex.synchronize do
@jobs.unshift job
@condvar.signal
end
end
end
if __FILE__==$0
mgr = Manager.new
mgr.start
puts "Now enqueue 10 jobs"
10.times{|i| mgr.enqueue_job("job##{i}") }
puts "Now enqueue 25 jobs"
25.times{|i| mgr.enqueue_job("job##{i+10}") }
puts "Now we stop all workers then enqueue new job"
mgr.stop
mgr.enqueue_job("job#x")
end
@zhenwusw
Copy link

zhenwusw commented Jan 4, 2016

def requeue_job(str)
    @job_queue.unshift(str) # undefined method of unshift :)
  end

@zhenwusw
Copy link

zhenwusw commented Jan 4, 2016

给主线程加个足够的 sleep, 才能看到 worker 工作的输出,否则主线程退出,其他的也退出了

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment