Created
October 9, 2008 09:47
-
-
Save nkpart/15737 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# http://github.com/evanphx/rubinius/tree/master/lib/actor.rb | |
# Stripped ends as a `rong` prototype | |
class Actor | |
class DeadActorError < RuntimeError | |
attr_reader :actor | |
attr_reader :reason | |
def initialize(actor, reason) | |
super(reason) | |
@actor = actor | |
@reason = reason | |
ANY = Object.new | |
def ANY.===(other) | |
true | |
class << self | |
alias_method :private_new, :new | |
private :private_new | |
@@registered_lock = Channel.new | |
@@registered = {} | |
@@registered_lock << nil | |
def current | |
Thread.current[:__current_actor__] ||= private_new | |
# Spawn a new Actor that will run in its own thread | |
def spawn(*args, &block) | |
raise ArgumentError, "no block given" unless block | |
spawned = Channel.new | |
Thread.new do | |
private_new do |actor| | |
Thread.current[:__current_actor__] = actor | |
spawned << actor | |
block.call *args | |
spawned.receive | |
alias_method :new, :spawn | |
# Atomically spawn an actor and link it to the current actor | |
def spawn_link(*args, &block) | |
current = self.current | |
link_complete = Channel.new | |
spawn do | |
begin | |
Actor.link(current) | |
ensure | |
link_complete << Actor.current | |
block.call *args | |
link_complete.receive | |
# Polls for exit notifications | |
def check_for_interrupt | |
current._check_for_interrupt | |
self | |
# Waits until a matching message is received in the current actor's | |
# mailbox, and executes the appropriate action. May be interrupted by | |
# exit notifications. | |
def receive #:yields: filter | |
filter = Filter.new | |
if block_given? | |
yield filter | |
else | |
filter.when(ANY) { |m| m } | |
current._receive(filter) | |
# Send a "fake" exit notification to another actor, as if the current | |
# actor had exited with +reason+ | |
def send_exit(recipient, reason) | |
recipient.notify_exited(current, reason) | |
self | |
# Link the current Actor to another one. | |
def link(actor) | |
current = self.current | |
current.notify_link actor | |
actor.notify_link current | |
self | |
# Unlink the current Actor from another one | |
def unlink(actor) | |
current = self.current | |
current.notify_unlink actor | |
actor.notify_unlink current | |
self | |
# Actors trapping exit do not die when an error occurs in an Actor they | |
# are linked to. Instead the exit message is sent to their regular | |
# mailbox in the form [:exit, actor, reason]. This allows certain | |
# Actors to supervise sets of others and restart them in the event | |
# of an error. Setting the trap flag may be interrupted by pending | |
# exit notifications. | |
# | |
def trap_exit=(value) | |
current._trap_exit = value | |
self | |
# Is the Actor trapping exit? | |
def trap_exit | |
current._trap_exit | |
alias_method :trap_exit?, :trap_exit | |
# Lookup a locally named service | |
def lookup(name) | |
raise ArgumentError, "name must be a symbol" unless Symbol === name | |
@@registered_lock.receive | |
begin | |
@@registered[name] | |
ensure | |
@@registered_lock << nil | |
alias_method :[], :lookup | |
# Register an Actor locally as a named service | |
def register(name, actor) | |
raise ArgumentError, "name must be a symbol" unless Symbol === name | |
unless actor.nil? or actor.is_a?(Actor) | |
raise ArgumentError, "only actors may be registered" | |
@@registered_lock.receive | |
begin | |
if actor.nil? | |
@@registered.delete(name) | |
else | |
@@registered[name] = actor | |
ensure | |
@@registered_lock << nil | |
alias_method :[]=, :register | |
def _unregister(actor) #:nodoc: | |
@@registered_lock.receive | |
begin | |
@@registered.delete_if { |n, a| actor.equal? a } | |
ensure | |
@@registered_lock << nil | |
def initialize | |
@lock = Channel.new | |
@filter = nil | |
@ready = Channel.new | |
@action = nil | |
@message = nil | |
@mailbox = [] | |
@interrupts = [] | |
@links = [] | |
@alive = true | |
@exit_reason = nil | |
@trap_exit = false | |
@thread = Thread.current | |
@lock << nil | |
if block_given? | |
watchdog { yield self } | |
else | |
Thread.new { watchdog { @thread.join } } | |
def send(message) | |
@lock.receive | |
begin | |
return self unless @alive | |
if @filter | |
@action = @filter.action_for(message) | |
if @action | |
@filter = nil | |
@message = message | |
@ready << nil | |
else | |
@mailbox << message | |
else | |
@mailbox << message | |
ensure | |
@lock << nil | |
self | |
alias_method :<<, :send | |
def _check_for_interrupt #:nodoc: | |
check_thread | |
@lock.receive | |
begin | |
raise @interrupts.shift unless @interrupts.empty? | |
ensure | |
@lock << nil | |
def _receive(filter) #:nodoc: | |
check_thread | |
action = nil | |
message = nil | |
timed_out = false | |
@lock.receive | |
begin | |
raise @interrupts.shift unless @interrupts.empty? | |
for i in 0...(@mailbox.size) | |
message = @mailbox[i] | |
action = filter.action_for(message) | |
if action | |
@mailbox.delete_at(i) | |
break | |
unless action | |
if filter.timeout? | |
timeout_id = Scheduler.send_in_seconds(@ready, filter.timeout, true) | |
else | |
timeout_id = nil | |
@filter = filter | |
@lock << nil | |
begin | |
timed_out = @ready.receive | |
ensure | |
@lock.receive | |
if timeout_id | |
Scheduler.cancel(timeout_id) | |
@ready << nil | |
@ready = Channel.new if @ready.receive | |
if not timed_out and @interrupts.empty? | |
action = @action | |
message = @message | |
else | |
@mailbox << @message if @action | |
@action = nil | |
@message = nil | |
raise @interrupts.shift unless @interrupts.empty? | |
ensure | |
@lock << nil | |
if timed_out | |
filter.timeout_action.call | |
else | |
action.call message | |
# Notify this actor that it's now linked to the given one; this is not | |
# intended to be used directly except by actor implementations. Most | |
# users will want to use Actor.link instead. | |
# | |
def notify_link(actor) | |
@lock.receive | |
alive = nil | |
exit_reason = nil | |
begin | |
alive = @alive | |
exit_reason = @exit_reason | |
@links << actor if alive and not @links.include? actor | |
ensure | |
@lock << nil | |
actor.notify_exited(self, exit_reason) unless alive | |
self | |
# Notify this actor that it's now unlinked from the given one; this is | |
# not intended to be used directly except by actor implementations. Most | |
# users will want to use Actor.unlink instead. | |
# | |
def notify_unlink(actor) | |
@lock.receive | |
begin | |
return self unless @alive | |
@links.delete(actor) | |
ensure | |
@lock << nil | |
self | |
# Notify this actor that one of the Actors it's linked to has exited; | |
# this is not intended to be used directly except by actor implementations. | |
# Most users will want to use Actor.send_exit instead. | |
# | |
def notify_exited(actor, reason) | |
@lock.receive | |
begin | |
return self unless @alive | |
@links.delete(actor) | |
if @trap_exit | |
send [:exit, actor, reason] | |
elsif reason | |
@interrupts << DeadActorError.new(actor, reason) | |
if @filter | |
@filter = nil | |
@ready << nil | |
ensure | |
@lock << nil | |
self | |
def watchdog | |
reason = nil | |
begin | |
yield | |
rescue Exception => reason | |
ensure | |
links = nil | |
Actor._unregister(self) | |
@lock.receive | |
begin | |
@alive = false | |
@mailbox = nil | |
@interrupts = nil | |
@exit_reason = reason | |
links = @links | |
@links = nil | |
ensure | |
@lock << nil | |
links.each do |actor| | |
begin | |
actor.notify_exited(self, reason) | |
rescue Exception | |
nil | |
private :watchdog | |
def check_thread | |
unless Thread.current == @thread | |
raise ThreadError, "illegal cross-actor call" | |
private :check_thread | |
def _trap_exit=(value) #:nodoc: | |
check_thread | |
@lock.receive | |
begin | |
raise @interrupts.shift unless @interrupts.empty? | |
@trap_exit = !!value | |
ensure | |
@lock << nil | |
def _trap_exit #:nodoc: | |
check_thread | |
@lock.receive | |
begin | |
@trap_exit | |
ensure | |
@lock << nil | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment