Last active
May 14, 2021 09:01
-
-
Save ismasan/c1c0f928bfc2a1e05099a9de659fcecd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
class Worker | |
def self.wrap(callable) | |
if callable.respond_to?(:setup) | |
callable | |
elsif callable.respond_to?(:call) | |
new(callable) | |
else | |
raise ArgumentError, "worker must implement #setup, #run, #shutdown" | |
end | |
end | |
def initialize(callable) | |
@callable = callable | |
end | |
def setup(*_) | |
self | |
end | |
def run | |
@callable.call | |
end | |
def shutdown | |
puts "Shutting down #{Process.pid}" | |
end | |
end | |
class Threaded | |
def initialize(workers: 1, work: nil, &block) | |
@workers = workers | |
@work = Worker.wrap(work || block) | |
@threads = [] | |
end | |
def setup(*args) | |
self | |
end | |
def run | |
@threads = @workers.times.map do |idx| | |
Thread.new(idx) do |c| | |
worker = @work.setup(c + 1) | |
begin | |
worker.run | |
ensure | |
worker.shutdown | |
end | |
end | |
end | |
wait | |
end | |
def shutdown | |
end | |
def wait | |
@threads.each &:join | |
rescue Interrupt => ex | |
# puts "AAA #{ex.class.name} #{Process.pid} BBB" | |
@threads.each { |t| Thread.kill(t) } | |
end | |
end | |
class Forked | |
def initialize(workers: 1, work: nil, &block) | |
@workers = workers | |
@work = Worker.wrap(work || block) | |
@pids = [] | |
end | |
def setup(*args) | |
self | |
end | |
def run | |
@pids = @workers.times.map do | |
Process.fork do | |
Process.setpgid(Process.pid, Process.pid) | |
# Signal.trap(:INT) do | |
# puts "child #{Process.pid} INT" | |
# end | |
worker = @work.setup(1) | |
begin | |
worker.run | |
rescue Interrupt => ex | |
# puts "AAA #{ex.class.name} #{Process.pid} BBB" | |
# raise ex | |
ensure | |
worker.shutdown | |
end | |
end | |
end | |
wait | |
end | |
def shutdown | |
end | |
def wait | |
@running = true | |
dead_processes = 0 | |
Signal.trap(:CHLD) do | |
begin | |
while pid = Process.wait(-1, Process::WNOHANG) | |
puts "child exited #{pid}" | |
dead_processes += 1 | |
if dead_processes == @pids.size | |
@running = false | |
end | |
end | |
rescue Errno::ECHILD | |
@running = false | |
end | |
end | |
Signal.trap(:INT) do | |
@pids.each do |pid| | |
Process.kill(:INT, pid) | |
end | |
end | |
while @running | |
sleep 0.5 | |
end | |
puts "all children exited. Bye!" | |
exit | |
end | |
end | |
class Hybrid | |
def initialize(workers: 1, threads: 1, work: nil, &block) | |
work ||= block | |
@worker = Forked.new(workers: workers, work: Threaded.new(workers: threads, work: work)) | |
end | |
def setup(*args) | |
self | |
end | |
def run | |
@worker.run | |
end | |
def wait | |
end | |
def shutdown | |
end | |
end | |
work = proc do | |
loop do | |
puts 'worky..' | |
sleep 2 | |
end | |
end | |
class SomeWork | |
def self.setup(idx) | |
new("[#{Process.pid}:#{idx}]") | |
end | |
def initialize(name) | |
@name = name | |
end | |
def run | |
log 'start' | |
loop do | |
log 'working' | |
sleep 2 | |
end | |
end | |
def shutdown | |
log 'shutting down' | |
sleep 3 | |
log 'bye' | |
end | |
private | |
def log(msg) | |
puts "#{@name} #{msg}" | |
end | |
end | |
# 2 forked workers | |
# runner = Forked.new(workers: 2, work: SomeWork) | |
# 2 threaded workers | |
# runner = Threaded.new(workers: 2, work: SomeWork) | |
# 2 forked workers, 2 threads each | |
# runner = Hybrid.new(workers: 2, threads: 2, work: SomeWork) | |
# 2 forked workers, 2 forked children each, 2 threads each child (4 child processes, 8 threads total) | |
# runner = Hybrid.new(workers: 2, work: Hybrid.new(workers: 2, threads: 2, work: SomeWork)) | |
# same as above, manual composition | |
runner = Forked.new( | |
workers: 2, | |
work: Forked.new( | |
workers: 2, | |
work: Threaded.new(workers: 2, work: SomeWork) | |
) | |
) | |
runner.run |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment