Skip to content

Instantly share code, notes, and snippets.

@JoshCheek
Created March 6, 2019 04:48
Show Gist options
  • Save JoshCheek/6019e2e93ed729cea9982fba9112a6df to your computer and use it in GitHub Desktop.
Save JoshCheek/6019e2e93ed729cea9982fba9112a6df to your computer and use it in GitHub Desktop.
Asynchronous Ruby
# some of the syntax requires Ruby 2.5+
module Async
refine BasicObject do
def async(&block)
Async::Context.new(&block).run!
end
def enqueue(&block)
::Fiber.yield [:append, block]
end
def pass
::Fiber.yield [:pass, nil]
end
def ensure_passes(&block)
events = ::Fiber.yield [:record_events]
block.call
ensure
pass unless events.any? { |type, payload| type == :pass }
end
end
refine Class do
def ensure_passes(method_name)
method = instance_method(method_name)
define_method method_name do |*args, &block|
ensure_passes { method.bind(self).call(*args, &block) }
end
end
end
end
class Async::Context
def initialize(&block)
@queue = []
append block
end
# kinda meh, should probably create objects to wrap the fibers
# would probably remove a lot of these locals and nested control flow
def run!
until @queue.empty?
fiber = @queue.shift
type = payload = nil
# synchronous
event_recorders = []
args = []
loop do
event = fiber.resume *args
event_recorders.each { |recorder| recorder << event }
type, payload = event
case type
when :append
append payload
args = []
when :record_events
recorder = []
event_recorders << recorder
args = [recorder]
else
break
end
end
case type
when :return
# noop
when :pass
@queue << fiber
when :error
raise payload
else
raise "WAT: #{type.inspect}, #{payload.inspect}"
end
end
end
def append(block)
@queue << Fiber.new do
[:return, block.call]
rescue StandardError => err
Fiber.yield [:error, err]
end
end
end
class Async::IO
using Async
def self.pipe
IO.pipe.map { |io| new io }
end
def initialize(io)
@io = io
@read_buffer = ""
@write_buffer = ""
end
def puts(*messages)
messages.each { |message| write "#{message.to_s.chomp}\n" }
nil
end
ensure_passes def write(string)
@write_buffer << string
while 0 < @write_buffer.bytesize
begin
written = @io.write_nonblock(@write_buffer)
rescue IO::WaitReadable, IO::WaitWritable
pass
retry
end
@write_buffer = @write_buffer.byteslice(written..-1)
end
end
ensure_passes def gets
loop do
line, buffer = @write_buffer.split("\n", 2)
line && @write_buffer = buffer and
return line
@write_buffer << @io.read_nonblock(1000)
end
rescue IO::WaitReadable, IO::WaitWritable
pass
retry
end
end
using Async
async do
readable, writable = Async::IO.pipe
enqueue do
8.times.map { readable.gets }
# => ["1", "A", "2", "B", "3", "C", "4", "D"]
end
enqueue do
writable.puts "1"
writable.puts "2"
writable.puts "3"
writable.puts "4"
end
enqueue do
writable.puts "A"
writable.puts "B"
writable.puts "C"
writable.puts "D"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment