Last active
May 25, 2022 19:34
-
-
Save knugie/69f8bc0fb6fc805d77d1b1ea489e7a97 to your computer and use it in GitHub Desktop.
Worker pool in Ruby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# from https://hspazio.github.io/2017/worker-pool/ | |
SIZED_QUEUE_SIZE = 10 | |
############################################################################ | |
def fib(n) | |
n < 2 ? n : fib(n-1) + fib(n-2) | |
end | |
############################################################################ | |
class Worker | |
attr_reader :name | |
def initialize(name) | |
@name = name | |
@queue = SizedQueue.new(SIZED_QUEUE_SIZE) | |
@thread = Thread.new { perform } | |
end | |
def <<(job) | |
@queue << job | |
end | |
def jobs_count | |
@queue.size | |
end | |
def join | |
@thread.join | |
end | |
private | |
def perform | |
while (job = @queue.pop) | |
break if job == :done | |
job.call | |
puts "#{name} got #{job}" # only for debugging | |
end | |
end | |
end | |
############################################################################ | |
class LeastBusyFirstScheduler | |
def initialize(workers) | |
@workers = workers | |
end | |
def schedule(job) | |
worker = @workers.sort_by(&:jobs_count).first | |
worker << job | |
end | |
end | |
############################################################################ | |
class WorkerPool | |
def initialize(num_workers, scheduler_factory) | |
@workers = Array.new(num_workers) { |n| Worker.new("worker_#{n}") } | |
@scheduler = scheduler_factory.new(@workers) | |
end | |
def <<(job) | |
if job == :done | |
@workers.map { |worker| worker << :done } | |
else | |
@scheduler.schedule(job) | |
end | |
end | |
def wait | |
@workers.map(&:join) | |
end | |
end | |
############################################################################ | |
pool = WorkerPool.new(5, LeastBusyFirstScheduler) | |
results = {} | |
Thread.new do | |
30.times { |n| pool << -> { results[n] = fib(n+20) } } | |
pool << :done | |
end | |
pool.wait | |
puts results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
CRuby (GIL):
$ rvm 2.5.1
$ ruby worker_pool.rb
JRuby (real threads):
$ rvm jruby-9.1.7.0
$ ruby worker_pool.rb