|
require 'thread' |
|
require 'monitor' |
|
|
|
# threads block, fibers yield |
|
# channel notifies a thread to resume the fiber |
|
|
|
Thread.abort_on_exception = true |
|
|
|
class Channel |
|
include MonitorMixin |
|
|
|
def initialize |
|
super() |
|
@mailbox = Queue.new |
|
@listeners = Queue.new |
|
end |
|
|
|
def write(message) |
|
synchronize do |
|
if @listeners.empty? |
|
@mailbox.push message |
|
else |
|
while listener = @listeners.pop |
|
listener.call message |
|
end |
|
end |
|
end |
|
end |
|
|
|
def read |
|
puts "Asynchronously waiting for any message to arrive: #{self}" |
|
Fiber.yield self |
|
end |
|
|
|
def await |
|
@mailbox.pop |
|
end |
|
|
|
def on_read(&block) |
|
synchronize do |
|
if @mailbox.size > 0 |
|
block.call @mailbox.pop |
|
else |
|
@listeners.push block |
|
end |
|
end |
|
end |
|
end |
|
|
|
class ThreadExecutor |
|
def initialize |
|
@queue = Queue.new |
|
@thread = Thread.new do |
|
loop do |
|
task = @queue.pop |
|
task.call |
|
end |
|
end |
|
end |
|
|
|
def submit(&block) |
|
@queue.push block |
|
end |
|
end |
|
|
|
class Initialize |
|
def initialize(args) |
|
@koroutine = args[:koroutine] |
|
@executor = args[:executor] |
|
end |
|
|
|
def run |
|
@executor.submit do |
|
StartFiber.new(fiber: @koroutine.to_fiber, executor: @executor).run |
|
end |
|
end |
|
end |
|
|
|
class StartFiber |
|
def initialize(args) |
|
@fiber = args[:fiber] |
|
@executor = args[:executor] |
|
raise "Unexpected class of fiber: #{@fiber.class}" unless @fiber.is_a? Fiber |
|
end |
|
|
|
def run |
|
@executor.submit do |
|
puts "Resuming for the first time" |
|
|
|
channel = @fiber.resume |
|
|
|
if channel.is_a? Channel |
|
puts "Resumed with #{channel}" |
|
|
|
ResumeFiber.new(fiber: @fiber, channel: channel, executor: @executor).run |
|
elsif (channel.is_a? ConditionVariable) || channel.nil? |
|
puts "The fiber should have been exited" |
|
else |
|
raise "Unexpected class of channel: #{channel.class}" |
|
end |
|
end |
|
end |
|
end |
|
|
|
class ResumeFiber |
|
def initialize(args) |
|
@fiber = args[:fiber] |
|
@channel = args[:channel] |
|
@executor = args[:executor] |
|
end |
|
|
|
def run |
|
@channel.on_read do |message| |
|
@executor.submit do |
|
channel = @fiber.resume message |
|
|
|
if channel.is_a? Channel |
|
@executor.submit do |
|
ResumeFiber.new(fiber: @fiber, channel: channel, executor: @executor).run |
|
end |
|
elsif (channel.is_a? ConditionVariable) || channel.nil? |
|
puts "The fiber should have been exited with value: #{channel}" |
|
else |
|
raise "Unexpected class of channel: #{channel.class}" |
|
end |
|
end |
|
end |
|
end |
|
end |
|
|
|
class KoroutineSystem |
|
def initialize |
|
@executor = ThreadExecutor.new |
|
end |
|
|
|
def ko(name, &block) |
|
koroutine = Koroutine.new(name: name, &block) |
|
@executor.submit do |
|
Initialize.new(koroutine: koroutine, executor: @executor).run |
|
end |
|
end |
|
end |
|
|
|
class Koroutine |
|
def initialize(args, &block) |
|
@name = args[:name] |
|
@block = block |
|
end |
|
|
|
def to_fiber |
|
Fiber.new &@block |
|
end |
|
end |
|
|
|
sys = KoroutineSystem.new |
|
|
|
ch1 = Channel.new |
|
ch2 = Channel.new |
|
ch3 = Channel.new |
|
|
|
ch1.write "mikoto" |
|
|
|
sys.ko :one do |
|
puts "The fiber resumed at first time" |
|
msg1 = ch1.read |
|
puts "The fiber resumed with the first message: #{msg1}" |
|
msg2 = ch2.read |
|
puts "The fiber resumed with the second message: #{msg2}" |
|
ch3.write "#{msg1}x#{msg2}" |
|
end |
|
|
|
sys.ko :two do |
|
puts "The second fiber resumed at first time" |
|
ch2.write "kuroko" |
|
puts "The second fiber wrote to ch2" |
|
end |
|
|
|
puts ch3.await |