Created
May 18, 2012 18:59
-
-
Save havocp/2727054 to your computer and use it in GitHub Desktop.
Squashed patch with mapBehavior
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java | |
index 5125611..25268dd 100644 | |
--- a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java | |
+++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java | |
@@ -8,6 +8,10 @@ import akka.routing.CurrentRoutees; | |
import akka.routing.FromConfig; | |
import akka.routing.NoRouter; | |
import akka.testkit.AkkaSpec; | |
+import static akka.pattern.Patterns.ask; | |
+import akka.dispatch.*; | |
+import akka.util.Duration; | |
+import akka.util.Timeout; | |
import org.junit.AfterClass; | |
import org.junit.BeforeClass; | |
@@ -68,4 +72,17 @@ public class JavaAPI { | |
ref.tell("hallo"); | |
ref.tell("hallo", ref); | |
} | |
+ | |
+ @Test | |
+ public void mustSupportPreAndPostReceive() throws Exception { | |
+ ActorRef ref = system.actorOf(new Props(JavaAPIPrePostActor.class)); | |
+ assertNotNull(ref); | |
+ Timeout timeout = new Timeout(Duration.parse("1 second")); | |
+ String pre = (String) Await.result(ask(ref, "onPreReceive", timeout), timeout.duration()); | |
+ String middle = (String) Await.result(ask(ref, "onReceivePartial", timeout), timeout.duration()); | |
+ String post = (String) Await.result(ask(ref, "onPostReceive", timeout), timeout.duration()); | |
+ assertEquals(pre, "onPreReceive"); | |
+ assertEquals(middle, "onReceivePartial"); | |
+ assertEquals(post, "onPostReceive"); | |
+ } | |
} | |
diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPIPrePostActor.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPIPrePostActor.java | |
new file mode 100644 | |
index 0000000..05dfc02 | |
--- /dev/null | |
+++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPIPrePostActor.java | |
@@ -0,0 +1,50 @@ | |
+package akka.actor; | |
+ | |
+import akka.japi.PartialProcedure; | |
+import akka.japi.Option; | |
+ | |
+public class JavaAPIPrePostActor extends UntypedActor { | |
+ @Override | |
+ public void onReceive(Object message) { | |
+ // this is not called since we override onReceivePartial instead | |
+ } | |
+ | |
+ @Override | |
+ public PartialProcedure<Object> onReceivePartial() { | |
+ PartialProcedure<Object> handler = new PartialProcedure<Object>() { | |
+ public void apply(Object o) { | |
+ getSender().tell("onReceivePartial"); | |
+ } | |
+ public boolean isDefinedAt(Object o) { | |
+ return (o instanceof String && ((String) o).equals("onReceivePartial")); | |
+ } | |
+ }; | |
+ return handler; | |
+ } | |
+ | |
+ @Override | |
+ public Option<PartialProcedure<Object>> onPreReceive() { | |
+ PartialProcedure<Object> handler = new PartialProcedure<Object>() { | |
+ public void apply(Object o) { | |
+ getSender().tell("onPreReceive"); | |
+ } | |
+ public boolean isDefinedAt(Object o) { | |
+ return (o instanceof String && ((String) o).equals("onPreReceive")); | |
+ } | |
+ }; | |
+ return Option.some(handler); | |
+ } | |
+ | |
+ @Override | |
+ public Option<PartialProcedure<Object>> onPostReceive() { | |
+ PartialProcedure<Object> handler = new PartialProcedure<Object>() { | |
+ public void apply(Object o) { | |
+ getSender().tell("onPostReceive"); | |
+ } | |
+ public boolean isDefinedAt(Object o) { | |
+ return (o instanceof String && ((String) o).equals("onPostReceive")); | |
+ } | |
+ }; | |
+ return Option.some(handler); | |
+ } | |
+} | |
diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala | |
index e8c667b..913e5cc 100644 | |
--- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala | |
+++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala | |
@@ -401,4 +401,81 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { | |
} | |
} | |
} | |
+ | |
+ "support mixin message handlers and execute in proper order" in { | |
+ // "pre" mixin that runs other handlers second | |
+ trait HandlesA1 extends Actor { | |
+ override def mapBehavior(behavior: Receive) = { | |
+ val handler: Receive = { | |
+ case "A1" ⇒ sender ! "HandlesA1" | |
+ } | |
+ super.mapBehavior(handler orElse behavior) | |
+ } | |
+ } | |
+ | |
+ // another "pre" mixin | |
+ trait HandlesA2 extends Actor { | |
+ override def mapBehavior(behavior: Receive) = { | |
+ val handler: Receive = { | |
+ case "A2" ⇒ sender ! "HandlesA2" | |
+ case "A1" ⇒ sender ! "HandlesA2" // not reached, HandlesA1 filters | |
+ } | |
+ super.mapBehavior(handler orElse behavior) | |
+ } | |
+ } | |
+ | |
+ // "post" mixin that runs other handlers first | |
+ trait HandlesB1 extends Actor { | |
+ override def mapBehavior(behavior: Receive) = { | |
+ val handler: Receive = { | |
+ case "B1" ⇒ sender ! "HandlesB1" | |
+ case "C" ⇒ sender ! "HandlesB1" // not reached, HandlesC filters | |
+ } | |
+ super.mapBehavior(behavior orElse handler) | |
+ } | |
+ } | |
+ | |
+ // another "post" mixin | |
+ trait HandlesB2 extends Actor { | |
+ override def mapBehavior(behavior: Receive) = { | |
+ val handler: Receive = { | |
+ case "B2" ⇒ sender ! "HandlesB2" | |
+ case "B1" ⇒ sender ! "HandlesB2" // not reached, HandlesB1 filters | |
+ case "C" ⇒ sender ! "HandlesB2" // not reached, HandlesC filters | |
+ } | |
+ super.mapBehavior(behavior orElse handler) | |
+ } | |
+ } | |
+ | |
+ // this is a completely unmodified actor other | |
+ // than having "with HandlesA with HandlesB", | |
+ // it doesn't have to worry about chaining up | |
+ // or anything like that. | |
+ class HandlesC extends Actor with HandlesA1 with HandlesA2 with HandlesB1 with HandlesB2 { | |
+ def receive = { | |
+ case "C" ⇒ sender ! "HandlesC" | |
+ case "A1" ⇒ sender ! "HandlesC" // not reached, HandlesA1 filters | |
+ case "A2" ⇒ sender ! "HandlesC" // not reached, HandlesA2 filters | |
+ } | |
+ } | |
+ | |
+ val timeout = Timeout(20000) | |
+ val ref = system.actorOf(Props(new HandlesC)) | |
+ | |
+ val a1 = (ref.ask("A1")(timeout)).mapTo[String] | |
+ val a2 = (ref.ask("A2")(timeout)).mapTo[String] | |
+ val c = (ref.ask("C")(timeout)).mapTo[String] | |
+ val b1 = (ref.ask("B1")(timeout)).mapTo[String] | |
+ val b2 = (ref.ask("B2")(timeout)).mapTo[String] | |
+ | |
+ ref ! PoisonPill | |
+ | |
+ Await.result(a1, timeout.duration) must be("HandlesA1") | |
+ Await.result(a2, timeout.duration) must be("HandlesA2") | |
+ Await.result(c, timeout.duration) must be("HandlesC") | |
+ Await.result(b1, timeout.duration) must be("HandlesB1") | |
+ Await.result(b2, timeout.duration) must be("HandlesB2") | |
+ | |
+ awaitCond(ref.isTerminated, 2000 millis) | |
+ } | |
} | |
diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala | |
index 2499d42..b043aea 100644 | |
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala | |
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala | |
@@ -240,6 +240,30 @@ trait Actor { | |
final def sender: ActorRef = context.sender | |
/** | |
+ * This method allows traits and subclasses to mix in actor behavior. | |
+ * Whenever an actor pushes a new behavior, it will be mapped | |
+ * using `mapBehavior`. (The default behavior is | |
+ * from the `receive` method but it can be replaced using the | |
+ * `become` method.) | |
+ * <p/> | |
+ * To allow multiple mixin traits, implementations of this | |
+ * method should chain up to `super.mapBehavior` in order | |
+ * to apply the customizations from supertypes. | |
+ * <p/> | |
+ * The simplest usage is to run some handler before the | |
+ * actor's normal behavior: | |
+ * {{{ | |
+ * override def mapBehavior(behavior: Receive) = { | |
+ * val handler: Receive = { | |
+ * case "MyMessage" ⇒ | |
+ * } | |
+ * super.mapBehavior(handler orElse behavior) | |
+ * } | |
+ * }}} | |
+ */ | |
+ protected def mapBehavior(behavior: Receive): Receive = behavior | |
+ | |
+ /** | |
* This defines the initial actor behavior, it must return a partial function | |
* with the actor logic. | |
*/ | |
@@ -321,7 +345,7 @@ trait Actor { | |
* For Akka internal use only. | |
*/ | |
private[akka] def pushBehavior(behavior: Receive): Unit = { | |
- behaviorStack = behaviorStack.push(behavior) | |
+ behaviorStack = behaviorStack.push(mapBehavior(behavior)) | |
} | |
/** | |
@@ -339,6 +363,6 @@ trait Actor { | |
private[akka] def clearBehaviorStack(): Unit = | |
behaviorStack = Stack.empty[Receive].push(behaviorStack.last) | |
- private var behaviorStack: Stack[Receive] = Stack.empty[Receive].push(receive) | |
+ private var behaviorStack: Stack[Receive] = Stack.empty[Receive].push(mapBehavior(receive)) | |
} | |
diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala | |
index a5ebeb8..b66637d 100644 | |
--- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala | |
+++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala | |
@@ -5,6 +5,7 @@ | |
package akka.actor | |
import akka.japi.{ Creator } | |
+import akka.japi | |
/** | |
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': | |
@@ -98,6 +99,19 @@ abstract class UntypedActor extends Actor { | |
@throws(classOf[Exception]) | |
def onReceive(message: Any): Unit | |
+ /** | |
+ * By default, `onReceivePartial` forwards to `onReceive`; if you need to | |
+ * avoid handling some messages (for example to allow a `postReceive` handler | |
+ * to run) then you could override `onReceivePartial` rather than `onReceive`. | |
+ * If you override `onReceivePartial` then `onReceive` will not be called | |
+ * unless you call it yourself. | |
+ */ | |
+ @throws(classOf[Exception]) | |
+ def onReceivePartial: japi.PartialProcedure[Any] = new japi.PartialProcedure[Any]() { | |
+ override def apply(x: Any) = onReceive(x) | |
+ override def isDefinedAt(x: Any) = true | |
+ } | |
+ | |
def getContext(): UntypedActorContext = context.asInstanceOf[UntypedActorContext] | |
/** | |
@@ -150,9 +164,37 @@ abstract class UntypedActor extends Actor { | |
*/ | |
override def postRestart(reason: Throwable): Unit = super.postRestart(reason) | |
- final protected def receive = { | |
- case msg ⇒ onReceive(msg) | |
+ final protected def receive = onReceivePartial.asScala | |
+ | |
+ // this isn't final so mixins can work, but | |
+ // overriding it in Java is not expected. | |
+ override protected def mapBehavior(behavior: Receive): Receive = { | |
+ val chain = Seq(onPreReceive.asScala.map(_.asScala), | |
+ Some(behavior), | |
+ onPostReceive.asScala.map(_.asScala)).flatMap(_.toSeq) | |
+ super.mapBehavior(chain.reduce(_ orElse _)) | |
} | |
+ | |
+ /** | |
+ * User overridable callback: by default it returns None. | |
+ * <p/> | |
+ * If you provide a handler, it will filter messages before the | |
+ * regular `onReceive` handler. | |
+ */ | |
+ protected def onPreReceive: japi.Option[japi.PartialProcedure[Any]] = japi.Option.none | |
+ | |
+ /** | |
+ * User overridable callback: by default it returns None. | |
+ * <p/> | |
+ * If you provide a handler, it will handle messages not matched by | |
+ * the regular `onReceivePartial` handler. Note that by default, | |
+ * `onReceivePartial` matches ALL messages by forwarding them to | |
+ * `onReceive`. Therefore, by default no `onPostReceive` handler | |
+ * will ever be used; only actors which override `onReceivePartial` | |
+ * to leave some messages unhandled can benefit from an | |
+ * `onPostReceive`. | |
+ */ | |
+ protected def onPostReceive: japi.Option[japi.PartialProcedure[Any]] = japi.Option.none | |
} | |
/** | |
diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala | |
index 47ce667..e7df118 100644 | |
--- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala | |
+++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala | |
@@ -35,6 +35,22 @@ trait Procedure2[T1, T2] { | |
} | |
/** | |
+ * A PartialProcedure abstract class. Used to create partial functions | |
+ * that return void in Java. | |
+ */ | |
+abstract class PartialProcedure[T] { | |
+ def apply(param: T): Unit | |
+ def isDefinedAt(param: T): Boolean | |
+ | |
+ private class DelegatingPartialFunction[T](val delegate: PartialProcedure[T]) extends scala.PartialFunction[T, Unit] { | |
+ override def apply(param: T) = delegate.apply(param) | |
+ override def isDefinedAt(param: T) = delegate.isDefinedAt(param) | |
+ } | |
+ | |
+ def asScala: scala.PartialFunction[T, Unit] = new DelegatingPartialFunction(this) | |
+} | |
+ | |
+/** | |
* An executable piece of code that takes no parameters and doesn't return any value. | |
*/ | |
trait SideEffect { | |
diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst | |
index 5374c8a..d4413ef 100644 | |
--- a/akka-docs/scala/actors.rst | |
+++ b/akka-docs/scala/actors.rst | |
@@ -668,15 +668,24 @@ state of the failing actor instance is lost if you don't store and restore it in | |
``preRestart`` and ``postRestart`` callbacks. | |
-Extending Actors using PartialFunction chaining | |
-=============================================== | |
+Extending Actors using mapBehavior and PartialFunction chaining | |
+=============================================================== | |
-A bit advanced but very useful way of defining a base message handler and then | |
-extend that, either through inheritance or delegation, is to use | |
-``PartialFunction.orElse`` chaining. | |
+You can create "mixin" traits or abstract classes using the | |
+``mapBehavior`` method on ``Actor``. This method modifies the | |
+standard actor behavior as defined by ``receive`` or ``become``. | |
+To allow multiple traits to be mixed in to one actor, when you | |
+override ``mapBehavior`` you should always chain | |
+up and allow supertypes to run their ``mapBehavior`` as well. | |
-.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-orElse | |
+.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-mapBehavior | |
+ | |
+Multiple traits that implement ``mapBehavior`` | |
+in this way can be mixed in to the same concrete class. The | |
+concrete class need not do anything special, it implements | |
+``receive`` as usual. | |
-Or: | |
+``PartialFunction.orElse`` chaining can also be used for more | |
+complex scenarios, like dynamic runtime registration of handlers: | |
-.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-orElse2 | |
+.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-orElse | |
diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala | |
index 0bc540f..048bae4 100644 | |
--- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala | |
+++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala | |
@@ -114,41 +114,58 @@ object SwapperApp extends App { | |
} | |
//#swapper | |
-//#receive-orElse | |
- | |
-abstract class GenericActor extends Actor { | |
- // to be defined in subclassing actor | |
- def specificMessageHandler: Receive | |
+//#receive-mapBehavior | |
+// trait providing a generic fallback message handler | |
+trait GenericActor extends Actor { | |
// generic message handler | |
- def genericMessageHandler: Receive = { | |
+ private def genericMessageHandler: Receive = { | |
case event ⇒ printf("generic: %s\n", event) | |
} | |
- def receive = specificMessageHandler orElse genericMessageHandler | |
+ // because we chain up to super.mapBehavior, | |
+ // multiple traits like this can be mixed in. | |
+ override def mapBehavior(behavior: Receive): Receive = | |
+ super.mapBehavior(behavior orElse genericMessageHandler) | |
} | |
class SpecificActor extends GenericActor { | |
- def specificMessageHandler = { | |
+ def receive = { | |
case event: MyMsg ⇒ printf("specific: %s\n", event.subject) | |
} | |
} | |
case class MyMsg(subject: String) | |
-//#receive-orElse | |
+//#receive-mapBehavior | |
-//#receive-orElse2 | |
+//#receive-orElse | |
trait ComposableActor extends Actor { | |
private var receives: List[Receive] = List() | |
+ private var composedReceives: Receive = Map.empty // in Scala 2.10, PartialFunction.empty | |
+ | |
protected def registerReceive(receive: Receive) { | |
+ // keep a list (allows unregistration) | |
receives = receive :: receives | |
+ // cache the composition of all receives | |
+ composedReceives = receives reduce { _ orElse _ } | |
} | |
- def receive = receives reduce { _ orElse _ } | |
+ // this indirection is because preReceive is only called | |
+ // once, but we want to allow registration post-construct, | |
+ // so we need a constant Receive that forwards to our | |
+ // dynamic Receive | |
+ private def handleRegisteredReceives: Receive = new PartialFunction[Any, Unit]() { | |
+ override def apply(x: Any) = composedReceives.apply(x) | |
+ override def isDefinedAt(x: Any) = composedReceives.isDefinedAt(x) | |
+ } | |
+ | |
+ override def mapBehavior(behavior: Receive) = | |
+ super.mapBehavior(behavior orElse handleRegisteredReceives) | |
} | |
class MyComposableActor extends ComposableActor { | |
override def preStart() { | |
+ // register some handlers dynamically | |
registerReceive({ | |
case "foo" ⇒ /* Do something */ | |
}) | |
@@ -157,9 +174,15 @@ class MyComposableActor extends ComposableActor { | |
case "bar" ⇒ /* Do something */ | |
}) | |
} | |
+ | |
+ // Runs after the dynamically-registered handlers, | |
+ // which are added with preReceive | |
+ def receive = { | |
+ case "baz" ⇒ | |
+ } | |
} | |
-//#receive-orElse2 | |
+//#receive-orElse | |
class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { | |
"import context" in { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment