Skip to content

Instantly share code, notes, and snippets.

@mpage
Created January 23, 2012 23:44
Show Gist options
  • Save mpage/1666516 to your computer and use it in GitHub Desktop.
Save mpage/1666516 to your computer and use it in GitHub Desktop.
Channel implementation (futures)
require "thread"
require "axon/future"
module Axon
class Channel
attr_reader :buffer_size
class BufferReceiveFuture < Axon::Future
def initialize(buffer)
super()
@buffer = buffer
end
def finish(result)
@buffer << result
true
end
end
class SendFuture < Axon::Future
attr_reader :value
def initialize(value)
super()
@value = value
end
def finish(recv_future)
@result_lock.synchronize do
if !@active
self
elsif recv_future.finish(@value)
@active = false
@result_cond.broadcast
nil
else
recv_future
end
end
end
end
def initialize(buffer_size=0)
@buffer_size = buffer_size
@buffer = []
@buf_recv_future = BufferReceiveFuture.new(@buffer)
@send_futures = []
@recv_futures = []
@chan_lock = Mutex.new
end
def send(value)
future = SendFuture.new(value)
fsend(future)
future.resolve
nil
end
def receive
future = Axon::Future.new
freceive(future)
future.resolve
end
def fsend(future)
@chan_lock.synchronize do
@send_futures << future
process_futures
end
nil
end
def freceive(future)
@chan_lock.synchronize do
@recv_futures << future
process_futures
end
nil
end
private
def process_futures
# Process receive futures
until @recv_futures.empty? || (@buffer.empty? && @send_futures.empty?)
recv_future = @recv_futures.shift
unless @buffer.empty?
if recv_future.finish(@buffer[0])
@buffer.shift
end
else
done = false
until done
send_future = @send_futures[0]
failed_future = send_future.finish(recv_future)
case failed_future
when nil
@send_futures.shift
done = true
when send_future
@send_futures.shift
done = @send_futures.empty?
when recv_future
done = true
end
end
end
end
while (@buffer.size < @buffer_size) && (send_future = @send_futures.shift)
send_future.finish(@buf_recv_future)
end
end
end
end
require "thread"
module Axon
class Future
def initialize
@result_lock = Mutex.new
@result_cond = ConditionVariable.new
@active = true
@result = nil
end
def finish(result)
@result_lock.synchronize do
if @active
@result = result
@active = false
@result_cond.broadcast
true
else
false
end
end
end
def cancel
@result_lock.synchronize do
if @active
@active = false
true
else
false
end
end
end
def resolve
@result_lock.synchronize do
if @active
@result_cond.wait(@result_lock)
end
@result
end
end
end
end
> ruby example/benchmark.rb
user system total real
20k msg (buf= 0), 1 snd, 1 rcv 0.820000 0.180000 1.000000 ( 0.897158)
20k msg (buf= 1), 1 snd, 1 rcv 0.750000 0.150000 0.900000 ( 0.809571)
20k msg (buf= 10), 1 snd, 1 rcv 0.410000 0.040000 0.450000 ( 0.448189)
20k msg (buf= 0), 10 snd, 1 rcv 1.060000 1.350000 2.410000 ( 1.666484)
20k msg (buf= 1), 10 snd, 1 rcv 1.320000 1.280000 2.600000 ( 1.755258)
20k msg (buf= 10), 10 snd, 1 rcv 0.960000 1.120000 2.080000 ( 1.313997)
20k msg (buf= 0), 1 snd, 10 rcv 1.110000 1.130000 2.240000 ( 1.469244)
20k msg (buf= 1), 1 snd, 10 rcv 1.270000 1.190000 2.460000 ( 1.624531)
20k msg (buf= 10), 1 snd, 10 rcv 0.820000 0.950000 1.770000 ( 1.164383)
20k msg (buf= 0), 10 snd, 10 rcv 1.810000 5.450000 7.260000 ( 2.953382)
20k msg (buf= 1), 10 snd, 10 rcv 1.970000 5.130000 7.100000 ( 2.872956)
>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment