Last active
July 12, 2016 08:12
-
-
Save robhurring/76b4c9783475747be5aef2d775a823d4 to your computer and use it in GitHub Desktop.
Lightweight actors in ruby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
module Actor | |
class << self | |
def included(base) | |
base.extend(ClassMethods) | |
end | |
def current | |
Thread.current[:actor] | |
end | |
end | |
module ClassMethods | |
def new(*) | |
Proxy.new(super) | |
end | |
end | |
class Proxy | |
attr_reader :outbox | |
def initialize(target) | |
@target = target | |
@mailbox = Queue.new | |
@outbox = Queue.new | |
@mutex = Mutex.new | |
@async_proxy = AsyncProxy.new(self) | |
@thread = Thread.new do | |
Thread.current[:actor] = self | |
Thread.abort_on_exception = true | |
process_inbox | |
end | |
end | |
def await | |
@thread.join | |
end | |
def future | |
Future.new(self) | |
end | |
def terminate | |
Thread.kill(@thread) | |
end | |
def alive? | |
@thread && @thread.alive? | |
end | |
def async | |
@async_proxy | |
end | |
def method_missing(sym, *args, &block) | |
process_message(sym, *args, &block) | |
end | |
def send_later(sym, *args, &block) | |
@mailbox << [sym, args, block] | |
end | |
private | |
def process_inbox | |
while Thread.current.alive? | |
sym, args, block = @mailbox.pop | |
process_message sym, *args, &block | |
end | |
rescue Exception => e | |
puts "[#{Actor.current}] Exception! #{e}" | |
raise | |
end | |
def process_message(sym, *args, outbox: nil, &block) | |
@mutex.synchronize do | |
result = @target.public_send(sym, *args, &block) | |
outbox.push(result) if outbox | |
end | |
end | |
end | |
class AsyncProxy | |
def initialize(actor) | |
@actor = actor | |
end | |
def method_missing(sym, *args, &block) | |
@actor.send_later(sym, *args, &block) | |
end | |
end | |
class Future | |
def initialize(actor) | |
@actor = actor | |
@mailbox = Queue.new | |
end | |
def value | |
if @mailbox.empty? && @last_value | |
@last_value | |
else | |
@last_value = @mailbox.pop | |
end | |
end | |
def method_missing(sym, *args, &block) | |
@mailbox.clear | |
args.push(outbox: @mailbox) | |
@actor.send_later(sym, *args, &block) | |
self | |
end | |
end | |
end | |
class Producer | |
include Actor | |
def initialize | |
@i = 0 | |
@delay = 0.1 | |
end | |
def produce(queue) | |
while Actor.current.alive? | |
item = "item-#{@i += 1}" | |
puts "[#{Actor.current}] produced - #{item} -- #{queue.size}" | |
queue << item | |
sleep @delay | |
end | |
end | |
end | |
class Consumer | |
include Actor | |
def initialize(name) | |
@name = name | |
@delay = rand(2) | |
end | |
def consume(queue) | |
while Actor.current.alive? | |
item = queue.pop | |
puts "[#{Actor.current}] consumer-#{@name} - consumed - #{item} -- #{queue.size}" | |
sleep @delay | |
end | |
end | |
end | |
class App | |
def initialize | |
@queue = SizedQueue.new(100) | |
end | |
def start(workers = 10) | |
monitor_signals | |
build_consumer_pool(workers) | |
start_producer | |
start_consumers | |
wait | |
end | |
def terminate | |
stop_producer | |
stop_consumers | |
end | |
private | |
attr_reader :queue | |
def monitor_signals | |
Signal.trap('INT') do | |
shutdown | |
end | |
Signal.trap('QUIT') do | |
shutdown | |
end | |
end | |
def shutdown | |
puts 'shutting down...' | |
terminate | |
exit 0 | |
end | |
def wait | |
@producer.await | |
end | |
def start_producer | |
@producer = Producer.new | |
@producer.async.produce(queue) | |
end | |
def stop_producer | |
@producer.terminate | |
end | |
def build_consumer_pool(workers) | |
@consumers = workers.times.map do |i| | |
Consumer.new(i) | |
end | |
end | |
def start_consumers | |
@consumers.each do |consumer| | |
consumer.async.consume(queue) | |
end | |
end | |
def stop_consumers | |
@consumers.each(&:terminate) | |
end | |
end | |
App.new.start(2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment