Created
April 10, 2012 20:34
-
-
Save jhosteny/2354274 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/lib/dcell/celluloid_ext.rb b/lib/dcell/celluloid_ext.rb | |
index ff48c94..82b4c8a 100644 | |
--- a/lib/dcell/celluloid_ext.rb | |
+++ b/lib/dcell/celluloid_ext.rb | |
@@ -9,6 +9,50 @@ | |
# DCell overlay network back to the node where the actor actually exists | |
module Celluloid | |
+ class << self | |
+ def multicall(actors, responses, timeout, &block) | |
+ uuid = Celluloid.uuid | |
+ actors.each do |actor| | |
+ class << actor | |
+ alias_method :old_method_missing, :method_missing | |
+ end | |
+ actor.instance_eval %Q{ | |
+ def method_missing(meth, *args, &block) | |
+ meth = meth.to_s | |
+ raise RuntimeError, "can't call async method with multicall" if meth.to_s.match(/!$/) | |
+ Actor.multicall @mailbox, \"#{uuid}\", meth, *args, &block | |
+ end | |
+ } | |
+ end | |
+ begin | |
+ response = yield actors | |
+ ensure | |
+ actors.each do |actor| | |
+ class << actor | |
+ alias_method :method_missing, :old_method_missing | |
+ end | |
+ end | |
+ end | |
+ response = nil unless response.is_a? Proc | |
+ if actor? and not timeout | |
+ responses.times do | |
+ value = Task.suspend(:callwait).value | |
+ response.call(value) if response | |
+ end | |
+ else | |
+ receive(timeout) do |msg| | |
+ if msg.respond_to?(:uuid) and msg.uuid == uuid | |
+ response.call(msg.value) if response | |
+ responses -= 1 | |
+ responses > 0 ? false : true | |
+ else | |
+ false | |
+ end | |
+ end | |
+ end | |
+ end | |
+ end | |
+ | |
class ActorProxy | |
# Marshal uses respond_to? to determine if this object supports _dump so | |
# unfortunately we have to monkeypatch in _dump support as the proxy | |
@@ -40,6 +84,64 @@ module Celluloid | |
end | |
end | |
+ class Actor | |
+ class << self | |
+ # Invoke a method on the given actor via its mailbox | |
+ def multicall(mailbox, uuid, meth, *args, &block) | |
+ call = MultiCall.new(Thread.mailbox, uuid, meth, args, block) | |
+ begin | |
+ mailbox << call | |
+ rescue MailboxError | |
+ raise DeadActorError, "attempted to call a dead actor" | |
+ end | |
+ end | |
+ end | |
+ end | |
+ | |
+ # Don't derive from Response, since that gets handled | |
+ # automatically by the reactor. | |
+ class MultiResponse | |
+ attr_reader :uuid, :value | |
+ | |
+ def initialize(uuid, value) | |
+ @uuid, @value = uuid, value | |
+ end | |
+ end | |
+ | |
+ # Multi calls wait for N responses with an optional timeout | |
+ class MultiCall < Call | |
+ attr_reader :task | |
+ attr_reader :uuid | |
+ | |
+ def initialize(caller, uuid, method, arguments = [], block = nil, task = Fiber.current.task) | |
+ super(caller, method, arguments, block) | |
+ @uuid = uuid | |
+ @task = task | |
+ end | |
+ | |
+ def dispatch(obj) | |
+ begin | |
+ check_signature(obj) | |
+ rescue => ex | |
+ Logger.crash("#{obj.class}: multi call failed!", ex) | |
+ return | |
+ end | |
+ | |
+ begin | |
+ result = obj.send @method, *@arguments, &@block | |
+ rescue AbortError => ex | |
+ # Swallow aborted async calls, as they indicate the caller made a mistake | |
+ Logger.crash("#{obj.class}: async call aborted!", ex) | |
+ end | |
+ begin | |
+ @caller << MultiResponse.new(@uuid, result) | |
+ rescue MailboxError | |
+ # It's possible the caller exited or crashed before we could send a | |
+ # response to them. | |
+ end | |
+ end | |
+ end | |
+ | |
class Mailbox | |
# This custom dumper registers actors with the DCell registry so they can | |
# be reached remotely. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment