Created
May 17, 2013 11:43
-
-
Save betawaffle/5598562 to your computer and use it in GitHub Desktop.
Extensions to Enumerable for Threads and Fibers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'fiber' # for Fiber#alive? | |
module Enumerable | |
DEFAULT_MAX_FIBERS = 20 | |
def each_in_fibers(max_fibers = nil, &block) | |
# If the enumerable object can tell us how big it is, cool. | |
# Unlike #each_in_threads, it doesn't really matter here. | |
# BUT I'M GOING TO DO THIS ANYWAY! BWAHAHAHAA! | |
if respond_to? :size | |
total = size | |
count = total if max_fibers.nil? or max_fibers > total | |
end | |
count ||= max_fibers || DEFAULT_MAX_FIBERS | |
fibers = [] | |
# We intentionally don't catch exceptions raised from | |
# inside the fibers. If #each itself throws an exception | |
# when it finishes enumerating, too fucking bad. | |
each do |*args| | |
fiber = Fiber.new(&block) | |
fiber.resume(*args) | |
fibers << fiber if fiber.alive? | |
# Run each of the fibers until there is room for more fibers. | |
until fibers.size < count | |
fiber = fibers.shift | |
fiber.resume | |
fibers << fiber if fiber.alive? | |
end | |
end | |
# Finish any remaining work. | |
until fibers.empty? | |
fiber = fibers.shift | |
fiber.resume | |
fibers << fiber if fiber.alive? | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' # for Queue | |
module Enumerable | |
DEFAULT_MAX_THREADS = 20 | |
def each_in_threads(max_threads = nil, &block) | |
# If the enumerable object can tell us how big it is, | |
# we can save creation of some unnecessary threads. | |
if respond_to? :size | |
total = size | |
count = total if max_threads.nil? or max_threads > total | |
end | |
count ||= max_threads || DEFAULT_MAX_THREADS | |
# We use a single queue to shift the burden of picking a | |
# thread to the underlying mutex. | |
queue = Queue.new | |
threads = count.times.map do | |
Thread.new do | |
# We push nils onto the queue when there is no more work, | |
# so this will finish and the thread will stop. | |
while args = queue.pop | |
block.call(*args) | |
end | |
# Check-in or close any connections we used. | |
ActiveRecord::Base.clear_active_connections! | |
end | |
end | |
begin | |
value = each { |*args| queue << args } | |
ensure | |
# Make sure we don't leave sleeping threads | |
# if #each throws an exception. | |
count.times { queue << nil } | |
end | |
# If we didn't throw an exception, let's | |
# wait for the work to finish. | |
threads.each(&:join) | |
value | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment