Created
March 6, 2019 04:48
-
-
Save JoshCheek/6019e2e93ed729cea9982fba9112a6df to your computer and use it in GitHub Desktop.
Asynchronous Ruby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# some of the syntax requires Ruby 2.5+ | |
module Async | |
refine BasicObject do | |
def async(&block) | |
Async::Context.new(&block).run! | |
end | |
def enqueue(&block) | |
::Fiber.yield [:append, block] | |
end | |
def pass | |
::Fiber.yield [:pass, nil] | |
end | |
def ensure_passes(&block) | |
events = ::Fiber.yield [:record_events] | |
block.call | |
ensure | |
pass unless events.any? { |type, payload| type == :pass } | |
end | |
end | |
refine Class do | |
def ensure_passes(method_name) | |
method = instance_method(method_name) | |
define_method method_name do |*args, &block| | |
ensure_passes { method.bind(self).call(*args, &block) } | |
end | |
end | |
end | |
end | |
class Async::Context | |
def initialize(&block) | |
@queue = [] | |
append block | |
end | |
# kinda meh, should probably create objects to wrap the fibers | |
# would probably remove a lot of these locals and nested control flow | |
def run! | |
until @queue.empty? | |
fiber = @queue.shift | |
type = payload = nil | |
# synchronous | |
event_recorders = [] | |
args = [] | |
loop do | |
event = fiber.resume *args | |
event_recorders.each { |recorder| recorder << event } | |
type, payload = event | |
case type | |
when :append | |
append payload | |
args = [] | |
when :record_events | |
recorder = [] | |
event_recorders << recorder | |
args = [recorder] | |
else | |
break | |
end | |
end | |
case type | |
when :return | |
# noop | |
when :pass | |
@queue << fiber | |
when :error | |
raise payload | |
else | |
raise "WAT: #{type.inspect}, #{payload.inspect}" | |
end | |
end | |
end | |
def append(block) | |
@queue << Fiber.new do | |
[:return, block.call] | |
rescue StandardError => err | |
Fiber.yield [:error, err] | |
end | |
end | |
end | |
class Async::IO | |
using Async | |
def self.pipe | |
IO.pipe.map { |io| new io } | |
end | |
def initialize(io) | |
@io = io | |
@read_buffer = "" | |
@write_buffer = "" | |
end | |
def puts(*messages) | |
messages.each { |message| write "#{message.to_s.chomp}\n" } | |
nil | |
end | |
ensure_passes def write(string) | |
@write_buffer << string | |
while 0 < @write_buffer.bytesize | |
begin | |
written = @io.write_nonblock(@write_buffer) | |
rescue IO::WaitReadable, IO::WaitWritable | |
pass | |
retry | |
end | |
@write_buffer = @write_buffer.byteslice(written..-1) | |
end | |
end | |
ensure_passes def gets | |
loop do | |
line, buffer = @write_buffer.split("\n", 2) | |
line && @write_buffer = buffer and | |
return line | |
@write_buffer << @io.read_nonblock(1000) | |
end | |
rescue IO::WaitReadable, IO::WaitWritable | |
pass | |
retry | |
end | |
end | |
using Async | |
async do | |
readable, writable = Async::IO.pipe | |
enqueue do | |
8.times.map { readable.gets } | |
# => ["1", "A", "2", "B", "3", "C", "4", "D"] | |
end | |
enqueue do | |
writable.puts "1" | |
writable.puts "2" | |
writable.puts "3" | |
writable.puts "4" | |
end | |
enqueue do | |
writable.puts "A" | |
writable.puts "B" | |
writable.puts "C" | |
writable.puts "D" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment