Skip to content

Instantly share code, notes, and snippets.

@ksss
Last active August 29, 2015 14:06
Show Gist options
  • Save ksss/1500dede257f32e783be to your computer and use it in GitHub Desktop.
Save ksss/1500dede257f32e783be to your computer and use it in GitHub Desktop.
Multi Process [study]
$ ruby simple_worker.rb 4
[52341]
[52341, 52342]
[52341, 52342, 52343]
[52341, 52342, 52343, 52344]
after wait pid:52341, status:#<Process::Status: pid 52341 exit 0>
[52342, 52343, 52344, 52345]
after wait pid:52343, status:#<Process::Status: pid 52343 exit 0>
[52342, 52344, 52345, 52346]
after wait pid:52342, status:#<Process::Status: pid 52342 exit 0>
[52344, 52345, 52346, 52347]
after wait pid:52344, status:#<Process::Status: pid 52344 exit 0>
[52345, 52346, 52347, 52348]
after wait pid:52345, status:#<Process::Status: pid 52345 exit 0>
[52346, 52347, 52348, 52349]
after wait pid:52348, status:#<Process::Status: pid 52348 exit 0>
[52346, 52347, 52349, 52350]
after wait pid:52346, status:#<Process::Status: pid 52346 exit 0>
after wait pid:52347, status:#<Process::Status: pid 52347 exit 0>
after wait pid:52349, status:#<Process::Status: pid 52349 exit 0>
after wait pid:52350, status:#<Process::Status: pid 52350 exit 0>
runningtime = 3.646952s, concurrency = 4
#! /usr/bin/env ruby
class SimpleWorker
class Configure
attr_reader :set
CALLBACK_NAMES = %w{before after}.product(%w{fork wait}).map do |names|
names.join('_').to_sym
end
def initialize
@set = {}
CALLBACK_NAMES.each do |name|
@set[name] = ->{ }
end
end
# define methods :before_fork, :before_wait, :after_fork, :after_wait
CALLBACK_NAMES.each do |name|
define_method(name) do |&block|
@set[name] = block if block
@set[name]
end
end
end
attr_reader :tasks, :concurrency, :waiting_pids, :finished_pids, :config
def initialize (tasks, concurrency, &block)
@tasks = tasks.to_a.dup
@concurrency = concurrency.to_i
@waiting_pids = []
@finished_pids = []
@config = Configure.new
block.call(@config) if block
end
def run
@tasks.each do |task|
@config.before_fork.call
@waiting_pids << fork { task.call }
@config.after_fork.call @waiting_pids.last
wait if @concurrency <= @waiting_pids.length
end
wait until finished?
end
def wait
@config.before_wait.call
pid, status = Process.wait2
@config.after_wait.call pid, status
@finished_pids << @waiting_pids.delete(pid)
end
def finished?
@tasks.length == @finished_pids.length
end
end
def fibo(n)
if n < 2
n
else
fibo(n - 1) + fibo(n - 2)
end
end
n = (ARGV[0] || 2).to_i
tasks = []
10.times do
tasks << proc{ fibo(35) }
end
w = SimpleWorker.new(tasks, n) do |config|
config.after_fork do |pid|
p w.waiting_pids
end
config.after_wait do |pid, status|
puts "after wait pid:#{pid}, status:#{status.inspect}"
end
end
t = Time.now
w.run
puts "runningtime = #{Time.now - t}s, concurrency = #{n}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment