Skip to content

Instantly share code, notes, and snippets.

@nkpart
Created October 9, 2008 09:47
Show Gist options
  • Save nkpart/15737 to your computer and use it in GitHub Desktop.
Save nkpart/15737 to your computer and use it in GitHub Desktop.
# http://github.com/evanphx/rubinius/tree/master/lib/actor.rb
# Stripped ends as a `rong` prototype
class Actor
class DeadActorError < RuntimeError
attr_reader :actor
attr_reader :reason
def initialize(actor, reason)
super(reason)
@actor = actor
@reason = reason
ANY = Object.new
def ANY.===(other)
true
class << self
alias_method :private_new, :new
private :private_new
@@registered_lock = Channel.new
@@registered = {}
@@registered_lock << nil
def current
Thread.current[:__current_actor__] ||= private_new
# Spawn a new Actor that will run in its own thread
def spawn(*args, &block)
raise ArgumentError, "no block given" unless block
spawned = Channel.new
Thread.new do
private_new do |actor|
Thread.current[:__current_actor__] = actor
spawned << actor
block.call *args
spawned.receive
alias_method :new, :spawn
# Atomically spawn an actor and link it to the current actor
def spawn_link(*args, &block)
current = self.current
link_complete = Channel.new
spawn do
begin
Actor.link(current)
ensure
link_complete << Actor.current
block.call *args
link_complete.receive
# Polls for exit notifications
def check_for_interrupt
current._check_for_interrupt
self
# Waits until a matching message is received in the current actor's
# mailbox, and executes the appropriate action. May be interrupted by
# exit notifications.
def receive #:yields: filter
filter = Filter.new
if block_given?
yield filter
else
filter.when(ANY) { |m| m }
current._receive(filter)
# Send a "fake" exit notification to another actor, as if the current
# actor had exited with +reason+
def send_exit(recipient, reason)
recipient.notify_exited(current, reason)
self
# Link the current Actor to another one.
def link(actor)
current = self.current
current.notify_link actor
actor.notify_link current
self
# Unlink the current Actor from another one
def unlink(actor)
current = self.current
current.notify_unlink actor
actor.notify_unlink current
self
# Actors trapping exit do not die when an error occurs in an Actor they
# are linked to. Instead the exit message is sent to their regular
# mailbox in the form [:exit, actor, reason]. This allows certain
# Actors to supervise sets of others and restart them in the event
# of an error. Setting the trap flag may be interrupted by pending
# exit notifications.
#
def trap_exit=(value)
current._trap_exit = value
self
# Is the Actor trapping exit?
def trap_exit
current._trap_exit
alias_method :trap_exit?, :trap_exit
# Lookup a locally named service
def lookup(name)
raise ArgumentError, "name must be a symbol" unless Symbol === name
@@registered_lock.receive
begin
@@registered[name]
ensure
@@registered_lock << nil
alias_method :[], :lookup
# Register an Actor locally as a named service
def register(name, actor)
raise ArgumentError, "name must be a symbol" unless Symbol === name
unless actor.nil? or actor.is_a?(Actor)
raise ArgumentError, "only actors may be registered"
@@registered_lock.receive
begin
if actor.nil?
@@registered.delete(name)
else
@@registered[name] = actor
ensure
@@registered_lock << nil
alias_method :[]=, :register
def _unregister(actor) #:nodoc:
@@registered_lock.receive
begin
@@registered.delete_if { |n, a| actor.equal? a }
ensure
@@registered_lock << nil
def initialize
@lock = Channel.new
@filter = nil
@ready = Channel.new
@action = nil
@message = nil
@mailbox = []
@interrupts = []
@links = []
@alive = true
@exit_reason = nil
@trap_exit = false
@thread = Thread.current
@lock << nil
if block_given?
watchdog { yield self }
else
Thread.new { watchdog { @thread.join } }
def send(message)
@lock.receive
begin
return self unless @alive
if @filter
@action = @filter.action_for(message)
if @action
@filter = nil
@message = message
@ready << nil
else
@mailbox << message
else
@mailbox << message
ensure
@lock << nil
self
alias_method :<<, :send
def _check_for_interrupt #:nodoc:
check_thread
@lock.receive
begin
raise @interrupts.shift unless @interrupts.empty?
ensure
@lock << nil
def _receive(filter) #:nodoc:
check_thread
action = nil
message = nil
timed_out = false
@lock.receive
begin
raise @interrupts.shift unless @interrupts.empty?
for i in 0...(@mailbox.size)
message = @mailbox[i]
action = filter.action_for(message)
if action
@mailbox.delete_at(i)
break
unless action
if filter.timeout?
timeout_id = Scheduler.send_in_seconds(@ready, filter.timeout, true)
else
timeout_id = nil
@filter = filter
@lock << nil
begin
timed_out = @ready.receive
ensure
@lock.receive
if timeout_id
Scheduler.cancel(timeout_id)
@ready << nil
@ready = Channel.new if @ready.receive
if not timed_out and @interrupts.empty?
action = @action
message = @message
else
@mailbox << @message if @action
@action = nil
@message = nil
raise @interrupts.shift unless @interrupts.empty?
ensure
@lock << nil
if timed_out
filter.timeout_action.call
else
action.call message
# Notify this actor that it's now linked to the given one; this is not
# intended to be used directly except by actor implementations. Most
# users will want to use Actor.link instead.
#
def notify_link(actor)
@lock.receive
alive = nil
exit_reason = nil
begin
alive = @alive
exit_reason = @exit_reason
@links << actor if alive and not @links.include? actor
ensure
@lock << nil
actor.notify_exited(self, exit_reason) unless alive
self
# Notify this actor that it's now unlinked from the given one; this is
# not intended to be used directly except by actor implementations. Most
# users will want to use Actor.unlink instead.
#
def notify_unlink(actor)
@lock.receive
begin
return self unless @alive
@links.delete(actor)
ensure
@lock << nil
self
# Notify this actor that one of the Actors it's linked to has exited;
# this is not intended to be used directly except by actor implementations.
# Most users will want to use Actor.send_exit instead.
#
def notify_exited(actor, reason)
@lock.receive
begin
return self unless @alive
@links.delete(actor)
if @trap_exit
send [:exit, actor, reason]
elsif reason
@interrupts << DeadActorError.new(actor, reason)
if @filter
@filter = nil
@ready << nil
ensure
@lock << nil
self
def watchdog
reason = nil
begin
yield
rescue Exception => reason
ensure
links = nil
Actor._unregister(self)
@lock.receive
begin
@alive = false
@mailbox = nil
@interrupts = nil
@exit_reason = reason
links = @links
@links = nil
ensure
@lock << nil
links.each do |actor|
begin
actor.notify_exited(self, reason)
rescue Exception
nil
private :watchdog
def check_thread
unless Thread.current == @thread
raise ThreadError, "illegal cross-actor call"
private :check_thread
def _trap_exit=(value) #:nodoc:
check_thread
@lock.receive
begin
raise @interrupts.shift unless @interrupts.empty?
@trap_exit = !!value
ensure
@lock << nil
def _trap_exit #:nodoc:
check_thread
@lock.receive
begin
@trap_exit
ensure
@lock << nil
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment