Skip to content

Instantly share code, notes, and snippets.

@ismasan
Last active May 14, 2021 09:01
Show Gist options
  • Save ismasan/c1c0f928bfc2a1e05099a9de659fcecd to your computer and use it in GitHub Desktop.
Save ismasan/c1c0f928bfc2a1e05099a9de659fcecd to your computer and use it in GitHub Desktop.
# frozen_string_literal: true
class Worker
def self.wrap(callable)
if callable.respond_to?(:setup)
callable
elsif callable.respond_to?(:call)
new(callable)
else
raise ArgumentError, "worker must implement #setup, #run, #shutdown"
end
end
def initialize(callable)
@callable = callable
end
def setup(*_)
self
end
def run
@callable.call
end
def shutdown
puts "Shutting down #{Process.pid}"
end
end
class Threaded
def initialize(workers: 1, work: nil, &block)
@workers = workers
@work = Worker.wrap(work || block)
@threads = []
end
def setup(*args)
self
end
def run
@threads = @workers.times.map do |idx|
Thread.new(idx) do |c|
worker = @work.setup(c + 1)
begin
worker.run
ensure
worker.shutdown
end
end
end
wait
end
def shutdown
end
def wait
@threads.each &:join
rescue Interrupt => ex
# puts "AAA #{ex.class.name} #{Process.pid} BBB"
@threads.each { |t| Thread.kill(t) }
end
end
class Forked
def initialize(workers: 1, work: nil, &block)
@workers = workers
@work = Worker.wrap(work || block)
@pids = []
end
def setup(*args)
self
end
def run
@pids = @workers.times.map do
Process.fork do
Process.setpgid(Process.pid, Process.pid)
# Signal.trap(:INT) do
# puts "child #{Process.pid} INT"
# end
worker = @work.setup(1)
begin
worker.run
rescue Interrupt => ex
# puts "AAA #{ex.class.name} #{Process.pid} BBB"
# raise ex
ensure
worker.shutdown
end
end
end
wait
end
def shutdown
end
def wait
@running = true
dead_processes = 0
Signal.trap(:CHLD) do
begin
while pid = Process.wait(-1, Process::WNOHANG)
puts "child exited #{pid}"
dead_processes += 1
if dead_processes == @pids.size
@running = false
end
end
rescue Errno::ECHILD
@running = false
end
end
Signal.trap(:INT) do
@pids.each do |pid|
Process.kill(:INT, pid)
end
end
while @running
sleep 0.5
end
puts "all children exited. Bye!"
exit
end
end
class Hybrid
def initialize(workers: 1, threads: 1, work: nil, &block)
work ||= block
@worker = Forked.new(workers: workers, work: Threaded.new(workers: threads, work: work))
end
def setup(*args)
self
end
def run
@worker.run
end
def wait
end
def shutdown
end
end
work = proc do
loop do
puts 'worky..'
sleep 2
end
end
class SomeWork
def self.setup(idx)
new("[#{Process.pid}:#{idx}]")
end
def initialize(name)
@name = name
end
def run
log 'start'
loop do
log 'working'
sleep 2
end
end
def shutdown
log 'shutting down'
sleep 3
log 'bye'
end
private
def log(msg)
puts "#{@name} #{msg}"
end
end
# 2 forked workers
# runner = Forked.new(workers: 2, work: SomeWork)
# 2 threaded workers
# runner = Threaded.new(workers: 2, work: SomeWork)
# 2 forked workers, 2 threads each
# runner = Hybrid.new(workers: 2, threads: 2, work: SomeWork)
# 2 forked workers, 2 forked children each, 2 threads each child (4 child processes, 8 threads total)
# runner = Hybrid.new(workers: 2, work: Hybrid.new(workers: 2, threads: 2, work: SomeWork))
# same as above, manual composition
runner = Forked.new(
workers: 2,
work: Forked.new(
workers: 2,
work: Threaded.new(workers: 2, work: SomeWork)
)
)
runner.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment