-
-
Save rkh/2037092 to your computer and use it in GitHub Desktop.
require 'thread' | |
class Worker | |
def initialize(count = 1) | |
@queue, @closing, @threads, @mutex = Queue.new, false, [], Mutex.new | |
add_worker(count) | |
end | |
def add_worker(count = 1) | |
@mutex.synchronize do | |
@threads += count.times.map { Thread.new { @queue.pop.call until @closing } } | |
end | |
end | |
def run(block = Proc.new) | |
@queue << block | |
end | |
def close(&block) | |
run do | |
@closing = true | |
yield if block_given? | |
wakeup | |
end | |
end | |
def join | |
@threads.each(&:join) | |
end | |
private | |
def wakeup | |
run { wakeup if @queue.num_waiting > 0 } | |
end | |
end |
I just removed wait. Dunno what it did there.
What are you trying to accomplish with run { @closing = true }
?
Couldn't you replace run { @closing = true }
with @closing = true
since the running threads will see it automagically?
def wait
cv, mutex = ConditionalVariable.new, Mutex.new
mutex.synchronize do
run { mutex.synchronize { cv.signal } }
cv.wait(mutex)
end
end
Actually nevermind, I see what it's necessary. :-)
That run { @closing = true }
is to make sure that the block passed to close
will run.
Note that wait
really only makes sense with worker size 1.
I like to have one Worker for reading from an IO and one for writing to it, one for logging to stdout and one for doing all the work. I also like to set the thread count to 0 and increase it later, which turns it into something like a deferrable.
I guess if you really wanted your threads to exit gracefully (honor the until @closing
part), you'd need to send that for all of your threads, not just once:
count.times { run { @closing = true } }
Why? I clear the queue anyways. I mean, there still is a race condition in there (i.e. someone queues after I cleared it but before I queue the close). After the first thread called the block, @closing
will be true
, no matter how often it is queued.
Right, but let's say you have 10 threads, and there is no work to be done but you want to close/exit gracefully. You'll only schedule one @closing = true, which means only one of your threads will return from the blocking @queue.pop method and realize @closing = true. Your other threads will continue blocking on that call waiting for more work. I think it only really matters if you have some cleanup (e.g., cleaning up file handles, etc) that you want each of your worker threads to do before the program exits.
right
Exactly what are you using this for?
@ryanlecompte check out the dummy addition I made.
@judofyr just nonsens scripts atm
@judofyr just nonsens scripts atm
Wouldn't it be better to do something like this to make sure the block is called?
def close
run do
@closing = true
yield if block_given?
end
end
right
If you have more than one worker, only one of them will shutdown, as the @closing
will only be set once. You probably need to remember your worker threads and schedule close
on each of them directly (or at least schedule close
worker_threads.count
times.
No, @closing
is local to the worker instance which can launch more than one thread, but it will be true
in all threads.
Meh, of course... But then I wouldn't call @queue.clear
to have something like a soft-stop and to not lose already scheduled jobs.
But that rather depends on your use case.
I have an old piece of code that looks awfully similar to this, but uses throw/catch instead of the @closing thing: http://burgestrand.se/code/ruby-thread-pool/thread-pool.rb (I partly did it as an experiment with rocco)
It does not support adding more workers, however.
You could probably avoid the busy-sleep in #wait by storing the Thread.current, call
sleep
and rather callthread.wakeup
when you pop.