-
-
Save rkh/2037092 to your computer and use it in GitHub Desktop.
require 'thread' | |
class Worker | |
def initialize(count = 1) | |
@queue, @closing, @threads, @mutex = Queue.new, false, [], Mutex.new | |
add_worker(count) | |
end | |
def add_worker(count = 1) | |
@mutex.synchronize do | |
@threads += count.times.map { Thread.new { @queue.pop.call until @closing } } | |
end | |
end | |
def run(block = Proc.new) | |
@queue << block | |
end | |
def close(&block) | |
run do | |
@closing = true | |
yield if block_given? | |
wakeup | |
end | |
end | |
def join | |
@threads.each(&:join) | |
end | |
private | |
def wakeup | |
run { wakeup if @queue.num_waiting > 0 } | |
end | |
end |
Why? I clear the queue anyways. I mean, there still is a race condition in there (i.e. someone queues after I cleared it but before I queue the close). After the first thread called the block, @closing
will be true
, no matter how often it is queued.
Right, but let's say you have 10 threads, and there is no work to be done but you want to close/exit gracefully. You'll only schedule one @closing = true, which means only one of your threads will return from the blocking @queue.pop method and realize @closing = true. Your other threads will continue blocking on that call waiting for more work. I think it only really matters if you have some cleanup (e.g., cleaning up file handles, etc) that you want each of your worker threads to do before the program exits.
right
Exactly what are you using this for?
@ryanlecompte check out the dummy addition I made.
@judofyr just nonsens scripts atm
@judofyr just nonsens scripts atm
Wouldn't it be better to do something like this to make sure the block is called?
def close
run do
@closing = true
yield if block_given?
end
end
right
If you have more than one worker, only one of them will shutdown, as the @closing
will only be set once. You probably need to remember your worker threads and schedule close
on each of them directly (or at least schedule close
worker_threads.count
times.
No, @closing
is local to the worker instance which can launch more than one thread, but it will be true
in all threads.
Meh, of course... But then I wouldn't call @queue.clear
to have something like a soft-stop and to not lose already scheduled jobs.
But that rather depends on your use case.
I have an old piece of code that looks awfully similar to this, but uses throw/catch instead of the @closing thing: http://burgestrand.se/code/ruby-thread-pool/thread-pool.rb (I partly did it as an experiment with rocco)
It does not support adding more workers, however.
I guess if you really wanted your threads to exit gracefully (honor the
until @closing
part), you'd need to send that for all of your threads, not just once:count.times { run { @closing = true } }