Skip to content

Instantly share code, notes, and snippets.

@betawaffle
Created May 17, 2013 11:43
Show Gist options
  • Save betawaffle/5598562 to your computer and use it in GitHub Desktop.
Save betawaffle/5598562 to your computer and use it in GitHub Desktop.
Extensions to Enumerable for Threads and Fibers
require 'fiber' # for Fiber#alive?
module Enumerable
DEFAULT_MAX_FIBERS = 20
def each_in_fibers(max_fibers = nil, &block)
# If the enumerable object can tell us how big it is, cool.
# Unlike #each_in_threads, it doesn't really matter here.
# BUT I'M GOING TO DO THIS ANYWAY! BWAHAHAHAA!
if respond_to? :size
total = size
count = total if max_fibers.nil? or max_fibers > total
end
count ||= max_fibers || DEFAULT_MAX_FIBERS
fibers = []
# We intentionally don't catch exceptions raised from
# inside the fibers. If #each itself throws an exception
# when it finishes enumerating, too fucking bad.
each do |*args|
fiber = Fiber.new(&block)
fiber.resume(*args)
fibers << fiber if fiber.alive?
# Run each of the fibers until there is room for more fibers.
until fibers.size < count
fiber = fibers.shift
fiber.resume
fibers << fiber if fiber.alive?
end
end
# Finish any remaining work.
until fibers.empty?
fiber = fibers.shift
fiber.resume
fibers << fiber if fiber.alive?
end
end
end
require 'thread' # for Queue
module Enumerable
DEFAULT_MAX_THREADS = 20
def each_in_threads(max_threads = nil, &block)
# If the enumerable object can tell us how big it is,
# we can save creation of some unnecessary threads.
if respond_to? :size
total = size
count = total if max_threads.nil? or max_threads > total
end
count ||= max_threads || DEFAULT_MAX_THREADS
# We use a single queue to shift the burden of picking a
# thread to the underlying mutex.
queue = Queue.new
threads = count.times.map do
Thread.new do
# We push nils onto the queue when there is no more work,
# so this will finish and the thread will stop.
while args = queue.pop
block.call(*args)
end
# Check-in or close any connections we used.
ActiveRecord::Base.clear_active_connections!
end
end
begin
value = each { |*args| queue << args }
ensure
# Make sure we don't leave sleeping threads
# if #each throws an exception.
count.times { queue << nil }
end
# If we didn't throw an exception, let's
# wait for the work to finish.
threads.each(&:join)
value
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment