Last active
August 29, 2015 14:02
-
-
Save seki/029fefbf940b30bfac35 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rinda/tuplespace' | |
class ChannelSpace | |
include DRbUndumped | |
class ChannelError < RuntimeError | |
def initialize(str, handle) | |
@channel = handle | |
super(str) | |
end | |
attr_reader :channel | |
end | |
class Readable | |
def ===(it) | |
it.readable? | |
end | |
end | |
class Any | |
def initialize(ary) | |
@ary = ary | |
end | |
def ===(it) | |
@ary.include?(it) | |
end | |
end | |
class Channel | |
def initialize(ts, size, handle) | |
@ts = ts | |
@handle = handle | |
@reader = [] | |
@writer = [] | |
@buf = [] | |
@size = size | |
@closed = false | |
end | |
def readable? | |
_readable? || @closed | |
end | |
def _readable? | |
@buf.size + @writer.size > 0 | |
end | |
def close | |
@closed = true | |
while writer = @writer.shift | |
@ts.write([:write, @handle, writer[1], self]) | |
end | |
while reader = @reader.shift | |
@ts.write([:read, @handle, reader, self]) | |
end | |
end | |
def writer_shift | |
return if @writer.empty? | |
value, key = @writer.shift | |
@buf << value | |
@ts.write([:write, @handle, key, value]) | |
end | |
def buf_push(value) | |
@writer << [value, Thread.current] | |
if @buf.size < @size | |
writer_shift | |
end | |
end | |
def buf_shift | |
writer_shift | |
@buf.shift | |
end | |
def req_read | |
if _readable? | |
@ts.write([:read, @handle, Thread.current, buf_shift]) | |
else | |
raise ChannelError.new('closed channel.', @handle) if @closed | |
@reader << Thread.current | |
end | |
end | |
def req_write(value) | |
raise ChannelError.new('closed channel.', @handle) if @closed | |
buf_push(value) | |
return if @reader.empty? | |
@ts.write([:read, @handle, @reader.shift, buf_shift]) | |
end | |
end | |
class Handle | |
include DRbUndumped | |
def initialize(cs) | |
@cs = cs | |
end | |
def write(value) | |
@cs.write(self, value) | |
end | |
def read | |
@cs.read(self) | |
end | |
def close | |
@cs.close(self) | |
end | |
end | |
def initialize | |
@ts = Rinda::TupleSpace.new | |
@readable = Readable.new | |
end | |
def open(size=0) | |
handle = Handle.new(self) | |
@ts.write([handle, Channel.new(@ts, size, handle)]) | |
handle | |
end | |
def close(handle) | |
begin | |
_, chan = @ts.take([handle, nil]) | |
chan.close | |
ensure | |
@ts.write([handle, chan]) | |
end | |
end | |
def write(handle, value) | |
begin | |
_, chan = @ts.take([handle, nil]) | |
chan.req_write(value) | |
ensure | |
@ts.write([handle, chan]) | |
end | |
_, _, _, value = @ts.take([:write, handle, Thread.current, nil]) | |
raise ChannelError.new('closed channel.', handle) if value == chan | |
value | |
end | |
def read(handle) | |
begin | |
_, chan = @ts.take([handle, nil]) | |
chan.req_read | |
ensure | |
@ts.write([handle, chan]) | |
end | |
_, _, _, value = @ts.take([:read, handle, Thread.current, nil]) | |
raise ChannelError.new('closed channel.', handle) if value == chan | |
value | |
end | |
def select(set) | |
handle, _ = @ts.read([Any.new(set), @readable]) | |
handle | |
end | |
def select_and_read(set) | |
begin | |
handle, chan = @ts.take([Any.new(set), @readable]) | |
chan.req_read | |
ensure | |
@ts.write([handle, chan]) | |
end | |
_, _, _, value = @ts.take([:read, handle, Thread.current, nil]) | |
raise ChannelError.new('closed channel.', handle) if value == chan | |
return handle, value | |
end | |
def after(sec, value=nil) | |
chan = open | |
Thread.new {sleep sec; chan.write(value)} | |
chan | |
end | |
@cs = self.new | |
def self.open(size=0) | |
@cs.open(size) | |
end | |
def self.after(sec, value=nil) | |
@cs.after(sec, value) | |
end | |
def self.select(set) | |
@cs.select(set) | |
end | |
def self.select_and_read(set) | |
@cs.select_and_read(set) | |
end | |
end | |
if __FILE__ == $0 | |
def f(chan, n) | |
sleep(rand * 10) | |
p [:f_begin, n] | |
chan.write([:f, n]) | |
p [:f_end, n] | |
chan.close | |
end | |
ary = (0..3).collect do |n| | |
chan = ChannelSpace.open(0) | |
Thread.new(n) {|x| loop {f(chan, x)}} | |
chan | |
end | |
timeout = ChannelSpace.after(10, :timeout) | |
ary << timeout | |
loop do | |
chan = ChannelSpace.select(ary) | |
begin | |
p chan.read | |
rescue ChannelSpace::ChannelError | |
p :closed | |
ary.delete(chan) | |
end | |
break if chan == timeout | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment