Last active
March 28, 2017 02:16
-
-
Save ioquatix/5b1572079864031bb618bdfd6d85d8c8 to your computer and use it in GitHub Desktop.
rough outline of fiber based concurrency
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'fiber' | |
require 'nio' | |
module Async | |
end | |
class Async::Await | |
def initialize(ios, reactor) | |
@ios = ios.collect{|io| Async::IO.wrap(io, reactor)} | |
end | |
attr :ios | |
def connect(addr, port) | |
raise NotImplementedError | |
end | |
def close | |
@ios.each(&:close) | |
end | |
end | |
class Async::IO | |
def self.wrap(io, *args) | |
case io | |
when TCPServer | |
return Async::TCP::Server.new(io, *args) | |
when TCPSocket | |
return Async::TCP::Socket.new(io, *args) | |
else | |
raise ArgumentError.new("Don't know how to wrap #{io.class}!") | |
end | |
end | |
def initialize(io, reactor) | |
@io = io | |
@reactor = reactor | |
@monitor = nil | |
end | |
def to_io | |
@io.to_io | |
end | |
def monitor(interests) | |
unless @monitor | |
@monitor = @reactor.register(to_io, interests) | |
@monitor.value = Fiber.current | |
else | |
@monitor.interests = interests | |
end | |
end | |
def wait_readable | |
Fiber.yield monitor(:r) | |
end | |
def wait_writable | |
Fiber.yield monitor(:w) | |
end | |
def wait_any | |
Fiber.yield monitor(:rw) | |
end | |
def method_missing(name, *args, &block) | |
@io.send(name, *args, &block) | |
end | |
def close | |
@monitor.close if @monitor | |
end | |
end | |
module Async::TCP | |
class Server < Async::IO | |
def accept(*args) | |
begin | |
@io.accept_nonblock(*args) | |
rescue IO::WaitReadable, Errno::EINTR | |
wait_readable | |
retry | |
end | |
end | |
end | |
class Socket < Async::IO | |
def read(*args) | |
begin | |
@io.read_nonblock(*args) | |
rescue IO::WaitReadable, Errno::EINTR | |
wait_readable | |
retry | |
end | |
end | |
def write(*args) | |
begin | |
@io.write_nonblock(*args) | |
rescue IO::WaitReadable, Errno::EINTR | |
wait_readable | |
retry | |
end | |
end | |
end | |
end | |
class Reactor | |
def initialize | |
puts "Creating selector" | |
@selector = NIO::Selector.new | |
@fibers = [] | |
end | |
def async(*ios) | |
@fibers << Fiber.new do | |
await = Async::Await.new(ios, self) | |
begin | |
yield *await.ios, await | |
ensure | |
await.close | |
end | |
end | |
end | |
def register(*args) | |
@selector.register(*args) | |
end | |
def run_forever | |
while @fibers.any? | |
puts "Updating #{@fibers.count} fibers..." | |
@fibers = @fibers.select do |fiber| | |
fiber.resume if fiber.alive? | |
end | |
puts "Selecting with #{@fibers.count} fibers..." | |
while monitors = @selector.select(1) | |
monitors.each do |monitor| | |
monitor.value.resume | |
end | |
end | |
end | |
end | |
def accept(server) | |
begin | |
return server.accept_nonblock(exceptions: false) | |
rescue IO::WaitReadable, Errno::EINTR | |
wait.for_readable | |
retry | |
end | |
end | |
end | |
reactor = Reactor.new | |
puts "Creating server" | |
server = TCPServer.new("localhost", 6777) | |
reactor.async(server) do |server| | |
while true | |
puts "Accepting peer on server #{server}" | |
peer = server.accept | |
puts "Sending data to peer" | |
peer << "data\n" | |
peer.shutdown | |
end | |
end | |
10.times do |i| | |
puts "Creating client #{i}" | |
client = TCPSocket.new("localhost", 6777) | |
reactor.async(client) do |client| | |
puts "Reading data on client #{i}" | |
puts client.read(1024) | |
end | |
end | |
reactor.run_forever |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment