Last active
February 2, 2021 16:55
-
-
Save henrik/57cbc4e9c6a59edf0e39a83d619c3cac to your computer and use it in GitHub Desktop.
Run a block on a list of things in a limited number of concurrent threads. Mostly for the fun of it – there are more featureful libs like https://github.com/grosser/parallel.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Lets you call a block for each item in a list, just like `each`. | |
# But instead of running serially, it runs in a limited number of parallel threads. | |
# This is useful when you don't just want one thread per item, e.g. to avoid rate limiting or network saturation. | |
class EachInThreadPool | |
def self.call(inputs, pool_size:, &block) | |
queue = Queue.new | |
inputs.each { queue << _1 } | |
pool_size.times.map { | |
Thread.new do | |
Thread.current.abort_on_exception = true | |
loop do | |
item = queue.pop(_raise_if_empty = true) | |
block.call(item) | |
rescue ThreadError => e | |
e.message == "queue empty" ? break : raise | |
end | |
end | |
}.each(&:join) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This implementation is nicer in some ways, but I really dislike the sleep. | |
class EachInThreadPool | |
def self.call(inputs, pool_size:, &block) | |
queue = Queue.new | |
inputs.each { queue << _1 } | |
threads = pool_size.times.map { | |
Thread.new do | |
Thread.current.abort_on_exception = true | |
loop do | |
block.call(queue.pop) | |
end | |
end | |
} | |
sleep 0.01 while queue.num_waiting < pool_size | |
threads.each(&:exit) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "spec_helper" | |
require "each_in_thread_pool" | |
RSpec.describe EachInThreadPool, ".call" do | |
it "performs the given work" do | |
list = (1..100).to_a | |
result = [] | |
EachInThreadPool.call(list, pool_size: 10) do |i| | |
result << i | |
end | |
expect(result.sort).to eq(list) | |
end | |
it "uses no more than the given number of threads" do | |
list = (1..100).to_a | |
thread_ids = Set.new | |
EachInThreadPool.call(list, pool_size: 15) do | |
thread_ids << Thread.current.object_id | |
# Slow down execution to avoid a single thread finishing all the work before the other threads get a chance. | |
sleep 0.01 | |
end | |
expect(thread_ids.length).to eq(15) | |
end | |
it "aborts on exception" do | |
list = [ -> { | |
Thread.current.report_on_exception = false # Don't print error output. | |
raise "boom" | |
} ] | |
expect { | |
EachInThreadPool.call(list, pool_size: 10, &:call) | |
}.to raise_error("boom") | |
end | |
it "aborts on ThreadErrors that don't represent an empty queue" do | |
list = [ -> { | |
Thread.current.report_on_exception = false # Don't print error output. | |
raise ThreadError, "fraying badly" | |
} ] | |
expect { | |
EachInThreadPool.call(list, pool_size: 10, &:call) | |
}.to raise_error(ThreadError) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment