-
-
Save sferik/39831f34eb87686b639c to your computer and use it in GitHub Desktop.
module Enumerable | |
def first_to_finish | |
threads = collect { |args| Thread.new { yield(args) } } | |
loop until done = threads.detect { |t| !t.alive? } | |
threads.each(&:kill) | |
done.value | |
end | |
end | |
puts [5, 3, 1, 2, 4].first_to_finish { |x| sleep x } |
require "benchmark/ips" | |
require "bcrypt" | |
module Enumerable | |
def first_to_finish | |
threads = collect { |args| Thread.new { yield(args) } } | |
loop until done = threads.detect { |t| !t.alive? } | |
threads.each(&:kill) | |
done.value | |
end | |
def first_to_finish_with_queue | |
queue = Queue.new | |
threads = collect { |args| Thread.new { queue << yield(args) } } | |
result = queue.pop | |
threads.each(&:kill) | |
result | |
end | |
def get_first_result_async | |
result = nil | |
threads = map do |args| | |
Thread.new do | |
if current_result = yield(args) | |
result = current_result | |
(threads - [Thread.current]).each(&:kill) # kill siblings | |
end | |
end | |
end | |
threads.each(&:join) | |
result | |
end | |
end | |
COSTS = (10..15).to_a.reverse | |
def sferik | |
COSTS.first_to_finish { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
def choonkeat | |
COSTS.first_to_finish_with_queue { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
def juanito | |
COSTS.get_first_result_async { |cost| BCrypt::Password.create("secret", :cost => cost) } | |
end | |
Benchmark.ips do |x| | |
x.report("@sferik") { sferik } | |
x.report("@choonkeat") { choonkeat } | |
x.report("@JuanitoFatas") { juanito } | |
x.compare! | |
end |
@JuanitoFatas The problem with your implementation is that it won’t work if the result is false
. I’ve updated my version to send kill
to each thread after the first result is found. I was relying on the garbage collector to cleanup the unreferenced threads after the method returned but that’s probably suboptimal.
thanks @sferik! sorry should've clarified that the lambas in each thread should be different, e.g. each one getting something from different servers
this looks elegant but doesn't it like keep the cpu busy looping?
loop until done = threads.detect { |t| !t.alive? }
threads.each(&:kill)
whereas the "kill sibling threads inside a thread" simply block and idle waiting for threads to join?
threads = map do |args|
Thread.new do
# something
(threads - [Thread.current]).each(&:kill) # kill siblings
end
end
threads.each(&:join)
this looks elegant but doesn't it like keep the cpu busy looping?
@choonkeat If there’s only one CPU/core, then this may not be the most efficient approach. However, in that case, there’s not much benefit in using multiple threads.
The efficiency depends a lot on the number of CPUs/cores and the threading model. This approach treats the main thread as the master, with n
worker threads. With a native threading model, as in JRuby, the tight loop on the master thread should not significantly interfere with the performance of the worker threads (modulo confounding factors like dynamic frequency scaling technologies, such as Intel’s Turbo Boost or AMD’s Turbo Core). Even in CRuby, with a global VM lock, the main thread should only be scheduled on 1 / (n + 1)
cycles.
There are some drawbacks of @JuanitoFatas’s approach. As I mentioned, it won’t work for blocks that return false
. It also depends on threads aborting quickly, which they probably will, but if one thread gets stuck, it will block the method from returning. I’ll do some benchmarking now on JRuby and CRuby and report my findings below.
In the process of benchmarking both versions , I found a race condition in @JuanitoFatas’s implementation on this line:
(threads - [Thread.current]).each(&:kill) # kill siblings
There’s a possibility that one thread finishes before last thread is spawned, in which case the threads
variable will be nil
, causing the main thread to crash.
My implementation manages to avoid this issue.
I’ve added a benchmark to the Gist, which computes hashes using bcrypt
, an algorithm designed to be CPU-intensive, with variable cost. I ran it on my quad-core 2.6 GHz Intel Core i7 under both CRuby 2.3.0-preview2 and JRuby 9.0.4.0. The results of the benchmark have extremely high variance, as you might expect when working with non-deterministic code. On CRuby, the two implementations were comparable, on JRuby my version was many times faster. On a machines with more CPUs/cores, I think the benefits of my implementation would be even more pronounced. I’d encourage you to run the benchmark to see the results for yourself.
Since my implementation has comparable performance on CRuby, better performance on JRuby, doesn’t have a race condition or issue with false
returns, and the code is more readable (less block nesting), I’d say it’s better in the general case.
cpu
for purpose where the threads are doing cpu intensive work, indeed we're probably soaked anyways, so you're right: giving up that nth bit of cpu is fine or rather, each thread is on their own core anyways. however for scenarios where threads are mostly waiting for io (e.g. fetch url) and this belongs to a bigger multitasking system (e.g. web app with several workers) then leaving the cpu idle is quite important
thanks for the benchmark rb, i'll try out a few things and reply!
@choonkeat That’s a fair point. In addition to being highly dependent on the number of CPUs/cores and the threading model, it is also highly sensitive to the workload. In my benchmark, I’m assuming a CPU-intensive workload but it’d worth writing a separate benchmark for an IO-intensive load. I suspect my implementation would perform somewhat worse under those conditions.
I’d also be curious to see an implementation written in a functional reactive style, where the threads are observable and an observer subscribes to them to get the first result. Maybe I’ll work on that now and add it to the benchmark.
@sferik i've borrowed your loop until
to address the threads
race condition, limiting the cpu hog https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L8 not as elegant, but works well even if the lambdas immediately return
def first_to_finish_with_idle
result = nil
threads = collect { |args| Thread.new {
loop until threads
result = yield(args)
(threads - [Thread.current]).each(&:kill)
} }
threads.each(&:join)
result
end
ok that was terrible. let's try again
using Queue
is promising https://gist.github.com/choonkeat/0c254b0c2efbc7237a2a#file-first_to_finish_benchmark-rb-L6-L12
def first_to_finish_with_queue
queue = Queue.new
threads = collect { |args| Thread.new { queue << yield(args) } }
result = queue.pop
threads.each(&:kill)
result
end
@choonkeat Using Queue
is lovely.
I’ve added your implementation to the benchmark. My implementation is about 50% faster than yours on JRuby but yours is about 30X faster than mine and @JuanitoFatas’s on CRuby. Again, the performance is highly workload-sensitive, so I suspect your implementation would beat mine on an IO-heavy load, even on JRuby. Very nice. 👏
In case anyone is curious, here’s the functional reactive implementation I came up with:
require "rx_ruby"
times = [5, 3, 1, 2, 4]
source = RxRuby::Observable.from(times).flat_map do |n|
RxRuby::Observable.of(n).delay(n)
end
subscription = source.subscribe { |s| puts s }
loop until Thread.list.size < times.size + 1
subscription.unsubscribe
I’m not an experienced RX programmer, so there’s a good chance I’m doing it completely wrong, but it was a fun exercise to try. Ultimately, I had to use the same loop until
hack that I used in the Enumerable
example, which was disappointing, but maybe someone can suggest a more elegant solution.
awesome that was fun @sferik 👏 👏 👏
btw since we're killing threads, I suspect those lambda arguments need to be wrapped with Thread.handle_interrupt
(or are they unrelated?)
folding the lessons here into choonkeat/attache#27 thanks guys
My poor version: