Skip to content

Instantly share code, notes, and snippets.

@jhosteny
Created April 10, 2012 20:34
Show Gist options
  • Save jhosteny/2354274 to your computer and use it in GitHub Desktop.
Save jhosteny/2354274 to your computer and use it in GitHub Desktop.
diff --git a/lib/dcell/celluloid_ext.rb b/lib/dcell/celluloid_ext.rb
index ff48c94..82b4c8a 100644
--- a/lib/dcell/celluloid_ext.rb
+++ b/lib/dcell/celluloid_ext.rb
@@ -9,6 +9,50 @@
# DCell overlay network back to the node where the actor actually exists
module Celluloid
+ class << self
+ def multicall(actors, responses, timeout, &block)
+ uuid = Celluloid.uuid
+ actors.each do |actor|
+ class << actor
+ alias_method :old_method_missing, :method_missing
+ end
+ actor.instance_eval %Q{
+ def method_missing(meth, *args, &block)
+ meth = meth.to_s
+ raise RuntimeError, "can't call async method with multicall" if meth.to_s.match(/!$/)
+ Actor.multicall @mailbox, \"#{uuid}\", meth, *args, &block
+ end
+ }
+ end
+ begin
+ response = yield actors
+ ensure
+ actors.each do |actor|
+ class << actor
+ alias_method :method_missing, :old_method_missing
+ end
+ end
+ end
+ response = nil unless response.is_a? Proc
+ if actor? and not timeout
+ responses.times do
+ value = Task.suspend(:callwait).value
+ response.call(value) if response
+ end
+ else
+ receive(timeout) do |msg|
+ if msg.respond_to?(:uuid) and msg.uuid == uuid
+ response.call(msg.value) if response
+ responses -= 1
+ responses > 0 ? false : true
+ else
+ false
+ end
+ end
+ end
+ end
+ end
+
class ActorProxy
# Marshal uses respond_to? to determine if this object supports _dump so
# unfortunately we have to monkeypatch in _dump support as the proxy
@@ -40,6 +84,64 @@ module Celluloid
end
end
+ class Actor
+ class << self
+ # Invoke a method on the given actor via its mailbox
+ def multicall(mailbox, uuid, meth, *args, &block)
+ call = MultiCall.new(Thread.mailbox, uuid, meth, args, block)
+ begin
+ mailbox << call
+ rescue MailboxError
+ raise DeadActorError, "attempted to call a dead actor"
+ end
+ end
+ end
+ end
+
+ # Don't derive from Response, since that gets handled
+ # automatically by the reactor.
+ class MultiResponse
+ attr_reader :uuid, :value
+
+ def initialize(uuid, value)
+ @uuid, @value = uuid, value
+ end
+ end
+
+ # Multi calls wait for N responses with an optional timeout
+ class MultiCall < Call
+ attr_reader :task
+ attr_reader :uuid
+
+ def initialize(caller, uuid, method, arguments = [], block = nil, task = Fiber.current.task)
+ super(caller, method, arguments, block)
+ @uuid = uuid
+ @task = task
+ end
+
+ def dispatch(obj)
+ begin
+ check_signature(obj)
+ rescue => ex
+ Logger.crash("#{obj.class}: multi call failed!", ex)
+ return
+ end
+
+ begin
+ result = obj.send @method, *@arguments, &@block
+ rescue AbortError => ex
+ # Swallow aborted async calls, as they indicate the caller made a mistake
+ Logger.crash("#{obj.class}: async call aborted!", ex)
+ end
+ begin
+ @caller << MultiResponse.new(@uuid, result)
+ rescue MailboxError
+ # It's possible the caller exited or crashed before we could send a
+ # response to them.
+ end
+ end
+ end
+
class Mailbox
# This custom dumper registers actors with the DCell registry so they can
# be reached remotely.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment