Created
May 30, 2012 23:32
-
-
Save havocp/2839564 to your computer and use it in GitHub Desktop.
some ActorCell hacking
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala | |
index 120caa3..00ef54e 100644 | |
--- a/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala | |
+++ b/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala | |
@@ -5,6 +5,7 @@ | |
package akka.actor | |
import akka.testkit._ | |
+import akka.util.duration._ | |
object HotSwapSpec { | |
abstract class Becomer extends Actor { | |
@@ -35,6 +36,38 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender { | |
expectMsg("4:pigdog") | |
} | |
+ "be able to unbecome back down to receive in its constructor" in { | |
+ val a = system.actorOf(Props(new Becomer { | |
+ for (i ← 1 to 4) context.become({ case always ⇒ sender ! i + ":" + always }) | |
+ for (i ← 1 to 4) context.unbecome() | |
+ def receive = { case always ⇒ sender ! "SUCCESS" } | |
+ })) | |
+ a ! "pigdog" | |
+ expectMsg("SUCCESS") | |
+ } | |
+ | |
+ "be able to unbecome back down to receive in its constructor after restart" in { | |
+ val restarter = system.actorOf(Props(new Actor { | |
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 1)(List(classOf[Exception])) | |
+ val a = context.actorOf(Props(new Becomer { | |
+ for (i ← 1 to 4) context.become({ case always ⇒ sender ! i + ":" + always }) | |
+ for (i ← 1 to 4) context.unbecome() | |
+ def receive = { case always ⇒ sender ! "SUCCESS" } | |
+ })) | |
+ | |
+ def receive = { | |
+ case "killChild" ⇒ a ! Kill | |
+ case always ⇒ a forward always | |
+ } | |
+ })) | |
+ | |
+ restarter ! "pigdog" | |
+ expectMsg("SUCCESS") | |
+ restarter ! "killChild" | |
+ restarter ! "pigdog" | |
+ expectMsg("SUCCESS") | |
+ } | |
+ | |
"be able to become with stacking in its constructor" in { | |
val a = system.actorOf(Props(new Becomer { | |
context.become({ case always ⇒ sender ! "pigdog:" + always; context.unbecome() }, false) | |
@@ -124,4 +157,49 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender { | |
expectMsg("0") | |
} | |
} | |
+ | |
+ "all behaviors removed after failed constructor" in { | |
+ val a = system.actorOf(Props(new Becomer { | |
+ context.become { case always ⇒ sender ! always } | |
+ throw new RuntimeException("expected exception: not creating Becomer") | |
+ def receive = { case always ⇒ sender ! "FAILURE" } | |
+ })) | |
+ a ! "pigdog" | |
+ expectNoMsg(1 second) | |
+ } | |
+ | |
+ "all behaviors removed after failed preStart" in { | |
+ val a = system.actorOf(Props(new Becomer { | |
+ context.become { case always ⇒ sender ! always } | |
+ override def preStart = throw new RuntimeException("expected exception: failing to preStart") | |
+ def receive = { case always ⇒ sender ! "FAILURE" } | |
+ })) | |
+ a ! "pigdog" | |
+ expectNoMsg(1 second) | |
+ } | |
+ | |
+ "all behaviors removed after failed postRestart" in { | |
+ val restarter = system.actorOf(Props(new Actor { | |
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 1)(List(classOf[Exception])) | |
+ | |
+ val a = system.actorOf(Props(new Becomer { | |
+ context.become { case always ⇒ sender ! always } | |
+ def receive = { case always ⇒ sender ! "FAILURE" } | |
+ override def postRestart(cause: Throwable): Unit = { | |
+ throw new RuntimeException("expected exception: failing to postRestart") | |
+ } | |
+ })) | |
+ | |
+ def receive = { | |
+ case "killChild" ⇒ a ! Kill | |
+ case always ⇒ a forward always | |
+ } | |
+ })) | |
+ | |
+ restarter ! "pigdog" | |
+ expectMsg("pigdog") | |
+ restarter ! "killChild" | |
+ restarter ! "pigdog" | |
+ expectNoMsg(1 second) | |
+ } | |
} | |
diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala | |
index 197e749..539d955 100644 | |
--- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala | |
+++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala | |
@@ -8,6 +8,7 @@ import akka.dispatch.{ PinnedDispatcher, Dispatchers, Await } | |
import java.util.concurrent.{ TimeUnit, CountDownLatch } | |
import akka.testkit.AkkaSpec | |
import akka.testkit.DefaultTimeout | |
+import akka.testkit.ImplicitSender | |
import akka.pattern.ask | |
import akka.util.duration._ | |
import akka.util.NonFatal | |
@@ -24,7 +25,7 @@ object SupervisorMiscSpec { | |
} | |
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) | |
-class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with DefaultTimeout { | |
+class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with DefaultTimeout with ImplicitSender { | |
"A Supervisor" must { | |
@@ -142,5 +143,118 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul | |
expectMsg("green") | |
} | |
+ class DelegatingStrategy(val delegate: SupervisorStrategy) extends SupervisorStrategy { | |
+ import java.util.concurrent.atomic.AtomicInteger | |
+ | |
+ val supervisorFailingCount = new AtomicInteger(0) | |
+ val supervisorRestartedCount = new AtomicInteger(0) | |
+ override def decider = delegate.decider | |
+ override def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = | |
+ delegate.handleChildTerminated(context, child, children) | |
+ override def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = | |
+ delegate.processFailure(context, restart, child, cause, stats, children) | |
+ override def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { | |
+ supervisorFailingCount.incrementAndGet() | |
+ delegate.handleSupervisorFailing(supervisor, children) | |
+ } | |
+ | |
+ override def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { | |
+ supervisorRestartedCount.incrementAndGet() | |
+ delegate.handleSupervisorRestarted(cause, supervisor, children) | |
+ } | |
+ | |
+ override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = | |
+ delegate.handleFailure(context, child, cause, stats, children) | |
+ } | |
+ | |
+ "send handleSupervisorFailing and handleSupervisorRestarted to the right Actor instance on postRestart fail" in { | |
+ var strategies = Seq.empty[DelegatingStrategy] | |
+ | |
+ val restarter = system.actorOf(Props(new Actor { | |
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])) | |
+ val a = context.actorOf(Props(new Actor { | |
+ override val supervisorStrategy = { | |
+ val s = new DelegatingStrategy(OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception]))) | |
+ strategies = strategies :+ s | |
+ s | |
+ } | |
+ | |
+ override def postRestart(cause: Throwable): Unit = | |
+ throw new RuntimeException("expected exception: postRestart failing") | |
+ | |
+ def receive = { | |
+ case anything ⇒ sender ! anything + " from supervised" | |
+ } | |
+ }), name = "child") | |
+ def receive = { | |
+ case "killChild" ⇒ a ! Kill | |
+ case always ⇒ a forward always | |
+ } | |
+ }), name = "restarterPostRestart") | |
+ | |
+ restarter ! "pigdog" | |
+ expectMsg("pigdog from supervised") | |
+ restarter ! "killChild" | |
+ restarter ! "pigdog" | |
+ expectNoMsg(1 second) | |
+ restarter ! "killChild" | |
+ restarter ! "pigdog" | |
+ expectNoMsg(1 second) | |
+ restarter ! "killChild" | |
+ restarter ! "pigdog" | |
+ expectNoMsg(1 second) | |
+ | |
+ strategies.size must be(4) | |
+ val failCounts = strategies.map(_.supervisorFailingCount.get) | |
+ val restartedCounts = strategies.map(_.supervisorRestartedCount.get) | |
+ // because each actor constructs OK then fails in postRestart, | |
+ // the SupervisorStrategy for the actor itself should be used. | |
+ (failCounts, restartedCounts) must be((Seq(1, 1, 1, 1), Seq(1, 1, 1, 0))) | |
+ | |
+ system.stop(restarter) | |
+ } | |
+ | |
+ "send handleSupervisorFailing and handleSupervisorRestarted to the right Actor instance on constructor fail" in { | |
+ var strategies = Seq.empty[DelegatingStrategy] | |
+ | |
+ val restarter = system.actorOf(Props(new Actor { | |
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])) | |
+ val a = context.actorOf(Props(new Actor { | |
+ override val supervisorStrategy = { | |
+ val s = new DelegatingStrategy(OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception]))) | |
+ strategies = strategies :+ s | |
+ s | |
+ } | |
+ | |
+ // fail the constructor after the first run | |
+ if (strategies.size > 1) | |
+ throw new RuntimeException("expected exception: constructor failing") | |
+ | |
+ def receive = { | |
+ case anything ⇒ sender ! anything + " from supervised" | |
+ } | |
+ }), name = "child") | |
+ def receive = { | |
+ case "killChild" ⇒ a ! Kill | |
+ case always ⇒ a forward always | |
+ } | |
+ }), name = "restarterConstructor") | |
+ | |
+ restarter ! "pigdog" | |
+ expectMsg("pigdog from supervised") | |
+ restarter ! "killChild" | |
+ restarter ! "pigdog" | |
+ expectNoMsg(1 second) | |
+ | |
+ strategies.size must be(4) | |
+ val failCounts = strategies.map(_.supervisorFailingCount.get) | |
+ val restartedCounts = strategies.map(_.supervisorRestartedCount.get) | |
+ // original Actor should get its own failure plus 3 constructor failures, | |
+ // and 3 restart attempts; because no later attempt constructs successfully. | |
+ (failCounts, restartedCounts) must be((Seq(4, 0, 0, 0), Seq(3, 0, 0, 0))) | |
+ | |
+ system.stop(restarter) | |
+ } | |
+ | |
} | |
} | |
diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala | |
index d295e6d..ddb5974 100644 | |
--- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala | |
+++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala | |
@@ -354,7 +354,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende | |
val dyingActor = Await.result((supervisor ? dyingProps).mapTo[ActorRef], timeout.duration) | |
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), | |
- EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { | |
+ EventFilter[IllegalStateException]("error while recreating actor", occurrences = 1)) { | |
intercept[RuntimeException] { | |
Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout) | |
} | |
diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala | |
index d4d5239..eebb451 100644 | |
--- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala | |
+++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala | |
@@ -78,12 +78,14 @@ trait ActorContext extends ActorRefFactory { | |
/** | |
* Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. | |
* Puts the behavior on top of the hotswap stack. | |
- * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack | |
+ * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack. | |
*/ | |
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit | |
/** | |
* Reverts the Actor behavior to the previous one in the hotswap stack. | |
+ * Never removes the original `receive` behavior (once all `become()` have | |
+ * been reversed, this becomes a no-op). | |
*/ | |
def unbecome(): Unit | |
@@ -185,12 +187,18 @@ private[akka] object ActorCell { | |
final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable) | |
- final val behaviorStackPlaceHolder: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior) | |
+ // when the Actor instance postdates a recreate or terminate, | |
+ // we don't want to use its actual behavior, so we put this | |
+ // in instead just to maintain the invariant that the | |
+ // behaviorStack is only empty while actor is null. | |
+ // TODO it's probably a bug if we receive a message while | |
+ // this is the behavior stack, so we can probably drop this. | |
+ final val deadActorBehaviorStack: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior) | |
sealed trait SuspendReason | |
case object UserRequest extends SuspendReason | |
- case class Recreation(cause: Throwable) extends SuspendReason | |
- case object Termination extends SuspendReason | |
+ case class Recreation(cause: Throwable, failed: Actor) extends SuspendReason | |
+ case class Termination(terminatedOption: Option[Actor]) extends SuspendReason | |
trait ChildrenContainer { | |
def add(child: ActorRef): ChildrenContainer | |
@@ -283,8 +291,8 @@ private[akka] object ActorCell { | |
def remove(child: ActorRef): ChildrenContainer = { | |
val t = toDie - child | |
if (t.isEmpty) reason match { | |
- case Termination ⇒ TerminatedChildrenContainer | |
- case _ ⇒ NormalChildrenContainer(c - child.path.name) | |
+ case _: Termination ⇒ TerminatedChildrenContainer | |
+ case _ ⇒ NormalChildrenContainer(c - child.path.name) | |
} | |
else copy(c - child.path.name, t) | |
} | |
@@ -357,12 +365,12 @@ private[akka] class ActorCell( | |
var childrenRefs: ChildrenContainer = EmptyChildrenContainer | |
private def isTerminating = childrenRefs match { | |
- case TerminatingChildrenContainer(_, _, Termination) ⇒ true | |
+ case TerminatingChildrenContainer(_, _, _: Termination) ⇒ true | |
case TerminatedChildrenContainer ⇒ true | |
case _ ⇒ false | |
} | |
private def isNormal = childrenRefs match { | |
- case TerminatingChildrenContainer(_, _, Termination | _: Recreation) ⇒ false | |
+ case TerminatingChildrenContainer(_, _, _: Termination | _: Recreation) ⇒ false | |
case _ ⇒ true | |
} | |
@@ -409,8 +417,23 @@ private[akka] class ActorCell( | |
var currentMessage: Envelope = null | |
+ // invariant: actor eq null whenever our actor never existed or is in the | |
+ // process of becoming nonexistent; currently these windows of time: | |
+ // - from ActorCell construction until first Actor instance constructor completes | |
+ // - from recreate request until recreated Actor instance constructor completes or fails | |
+ // - from terminate request until we terminate or fail to terminate | |
+ // After successful termination or failed recreation, set to a "dead" actor | |
+ // and the actor's real behavior is removed from behaviorStack. | |
+ // "dead" actors are kept around just to use their supervisorStrategy. | |
var actor: Actor = _ | |
+ // behaviorStack has three states: | |
+ // - set to empty anytime actor is set to null, and remains empty | |
+ // until start of Actor instance constructor | |
+ // - from start of Actor instance constructor to end of constructor, | |
+ // collects become() behaviors called by constructor | |
+ // - when actor is set to non-null post-construction, behaviorStack | |
+ // must be set to valid and ready-to-use behaviors | |
private var behaviorStack: Stack[Actor.Receive] = Stack.empty | |
@volatile //This must be volatile since it isn't protected by the mailbox status | |
@@ -491,24 +514,33 @@ private[akka] class ActorCell( | |
case _ ⇒ system.deadLetters | |
} | |
+ protected def clearActor(): Unit = { | |
+ behaviorStack = Stack.empty | |
+ actor = null | |
+ } | |
+ | |
//This method is in charge of setting up the contextStack and create a new instance of the Actor | |
- protected def newActor(): Actor = { | |
+ protected def createActor(): Unit = { | |
+ require(behaviorStack.isEmpty) | |
+ require(actor eq null) | |
+ | |
contextStack.set(contextStack.get.push(this)) | |
try { | |
- import ActorCell.behaviorStackPlaceHolder | |
- | |
- behaviorStack = behaviorStackPlaceHolder | |
val instance = props.creator.apply() | |
if (instance eq null) | |
throw new ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'") | |
- behaviorStack = behaviorStack match { | |
- case `behaviorStackPlaceHolder` ⇒ Stack.empty.push(instance.receive) | |
- case newBehaviors ⇒ Stack.empty.push(instance.receive).pushAll(newBehaviors.reverse.drop(1)) | |
+ if (behaviorStack.isEmpty) { | |
+ behaviorStack = Stack.empty.push(instance.receive) | |
+ } else { | |
+ behaviorStack = Stack.empty.push(instance.receive).pushAll(behaviorStack.reverse) | |
} | |
- instance | |
+ actor = instance | |
} finally { | |
+ if (actor eq null) | |
+ behaviorStack = Stack.empty // remove any become() from failed create | |
+ | |
val stackAfter = contextStack.get | |
if (stackAfter.nonEmpty) | |
contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context | |
@@ -520,11 +552,12 @@ private[akka] class ActorCell( | |
def create(): Unit = if (isNormal) { | |
try { | |
- val created = newActor() | |
- actor = created | |
- created.preStart() | |
+ require(actor eq null) | |
+ require(behaviorStack.isEmpty) | |
+ createActor() | |
+ actor.preStart() | |
checkReceiveTimeout | |
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) | |
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "started (" + actor + ")")) | |
} catch { | |
case NonFatal(i: InstantiationException) ⇒ | |
throw new ActorInitializationException(self, | |
@@ -539,21 +572,21 @@ private[akka] class ActorCell( | |
def recreate(cause: Throwable): Unit = if (isNormal) { | |
try { | |
val failedActor = actor | |
+ clearActor() | |
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting")) | |
- if (failedActor ne null) { | |
- val c = currentMessage //One read only plz | |
- try { | |
- if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) | |
- } finally { | |
- clearActorFields(failedActor) | |
- } | |
+ require(failedActor ne null) | |
+ val c = currentMessage //One read only plz | |
+ try { | |
+ if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) | |
+ } finally { | |
+ clearActorFields(failedActor) | |
} | |
childrenRefs match { | |
case ct: TerminatingChildrenContainer ⇒ | |
- childrenRefs = ct.copy(reason = Recreation(cause)) | |
+ childrenRefs = ct.copy(reason = Recreation(cause, failedActor)) | |
dispatcher suspend this | |
case _ ⇒ | |
- doRecreate(cause, failedActor) | |
+ finishRecreate(cause, failedActor) | |
} | |
} catch { | |
case NonFatal(e) ⇒ throw new ActorInitializationException(self, "exception during creation", e match { | |
@@ -583,16 +616,19 @@ private[akka] class ActorCell( | |
setReceiveTimeout(None) | |
cancelReceiveTimeout | |
+ val terminated = actor | |
+ clearActor() | |
+ | |
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children) | |
children foreach stop | |
childrenRefs match { | |
case ct: TerminatingChildrenContainer ⇒ | |
- childrenRefs = ct.copy(reason = Termination) | |
+ childrenRefs = ct.copy(reason = Termination(Option(terminated))) | |
// do not process normal messages while waiting for all children to terminate | |
dispatcher suspend this | |
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) | |
- case _ ⇒ doTerminate() | |
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(terminated), "stopping")) | |
+ case _ ⇒ finishTerminate(Option(terminated)) | |
} | |
} | |
@@ -662,9 +698,17 @@ private[akka] class ActorCell( | |
become({ case msg ⇒ behavior.apply(msg) }: Actor.Receive, discardOld) | |
def unbecome(): Unit = { | |
- val original = behaviorStack | |
- val popped = original.pop | |
- behaviorStack = if (popped.isEmpty) original else popped | |
+ // actor.receive at the base of the stack can never be popped, | |
+ // but during actor construction we can pop down to empty. | |
+ if (actor ne null) { | |
+ require(behaviorStack.nonEmpty) | |
+ val original = behaviorStack | |
+ val popped = original.pop | |
+ behaviorStack = if (popped.isEmpty) original else popped | |
+ } else { | |
+ if (behaviorStack.nonEmpty) | |
+ behaviorStack = behaviorStack.pop | |
+ } | |
} | |
def autoReceiveMessage(msg: Envelope): Unit = { | |
@@ -682,16 +726,22 @@ private[akka] class ActorCell( | |
} | |
final def receiveMessage(msg: Any): Unit = { | |
+ require(actor ne null) | |
+ require(behaviorStack.nonEmpty) | |
//FIXME replace with behaviorStack.head.applyOrElse(msg, unhandled) + "-optimize" | |
val head = behaviorStack.head | |
if (head.isDefinedAt(msg)) head.apply(msg) else actor.unhandled(msg) | |
} | |
- private def doTerminate() { | |
- val a = actor | |
+ private def finishTerminate(terminatedOption: Option[Actor]) { | |
+ require(actor eq null) | |
+ require(behaviorStack.isEmpty) | |
try { | |
try { | |
- if (a ne null) a.postStop() | |
+ for (terminated ← terminatedOption) { | |
+ if (terminated.context ne null) | |
+ terminated.postStop() | |
+ } | |
} finally { | |
dispatcher.detach(this) | |
} | |
@@ -700,35 +750,66 @@ private[akka] class ActorCell( | |
parent.sendSystemMessage(ChildTerminated(self)) | |
system.deathWatch.publish(Terminated(self)) | |
if (system.settings.DebugLifecycle) | |
- system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped")) | |
+ system.eventStream.publish(Debug(self.path.toString, clazz(terminatedOption.orNull), "stopped")) | |
} finally { | |
- behaviorStack = ActorCell.behaviorStackPlaceHolder | |
- clearActorFields(a) | |
- actor = null | |
+ for (terminated ← terminatedOption) { | |
+ clearActorFields(terminated) | |
+ actor = terminated | |
+ behaviorStack = ActorCell.deadActorBehaviorStack | |
+ } | |
} | |
} | |
} | |
- private def doRecreate(cause: Throwable, failedActor: Actor): Unit = try { | |
- // after all killed children have terminated, recreate the rest, then go on to start the new instance | |
- actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) | |
- val freshActor = newActor() | |
- actor = freshActor // this must happen before postRestart has a chance to fail | |
- if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. | |
- | |
- freshActor.postRestart(cause) | |
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted")) | |
- | |
- dispatcher.resume(this) | |
+ // if we have the Actor instance cleared pending termination | |
+ // or recreation, this guards against failure to complete | |
+ // the termination or recreation | |
+ private def handlingCleanupFailure(deadOption: Option[Actor], stage: String)(body: ⇒ Unit): Unit = try { | |
+ body | |
} catch { | |
case NonFatal(e) ⇒ try { | |
- dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) | |
- // prevent any further messages to be processed until the actor has been restarted | |
+ dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(deadOption.orNull), "error while " + stage + " actor"), e)) | |
+ | |
dispatcher.suspend(this) | |
- actor.supervisorStrategy.handleSupervisorFailing(self, children) // FIXME Should this be called on actor or failedActor? | |
- clearActorFields(actor) // If this fails, we need to ensure that preRestart isn't called. | |
+ | |
+ for (dead ← deadOption) { | |
+ dead.supervisorStrategy.handleSupervisorFailing(self, children) | |
+ // we won't call preRestart or postStop again with fields cleared | |
+ clearActorFields(dead) | |
+ } | |
} finally { | |
- parent.tell(Failed(new ActorInitializationException(self, "exception during re-creation", e)), self) | |
+ // put the failed actor back so we have its supervisorStrategy | |
+ for (dead ← deadOption) { | |
+ actor = dead | |
+ behaviorStack = ActorCell.deadActorBehaviorStack | |
+ } | |
+ parent.tell(Failed(new ActorInitializationException(self, "exception while " + stage + " actor", e)), self) | |
+ } | |
+ } | |
+ | |
+ // after all killed children have terminated, recreate the rest, then go on to start the new instance | |
+ private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = try { | |
+ require(failedActor ne null) | |
+ require(failedActor.context eq null) | |
+ require(actor eq null) | |
+ require(behaviorStack.isEmpty) | |
+ | |
+ // failures are handled by the "current" actor's supervisorStrategy, | |
+ // which means failedActor until actor ne null and then actor after | |
+ | |
+ handlingCleanupFailure(Some(failedActor), "recreating") { | |
+ failedActor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) | |
+ | |
+ createActor() | |
+ | |
+ handlingCleanupFailure(Some(actor), "postRestart-ing") { | |
+ if (actor eq failedActor) setActorFields(actor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. | |
+ | |
+ actor.postRestart(cause) | |
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "restarted")) | |
+ | |
+ dispatcher.resume(this) | |
+ } | |
} | |
} | |
@@ -737,29 +818,37 @@ private[akka] class ActorCell( | |
case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) | |
} | |
- final def handleChildTerminated(child: ActorRef): Unit = try { | |
+ private def invokeHandleChildTerminated(supervisorOption: Option[Actor], child: ActorRef): Unit = { | |
+ for (supervisor ← supervisorOption) { | |
+ handlingCleanupFailure(supervisorOption, "child-terminated-notifying") { | |
+ supervisor.supervisorStrategy.handleChildTerminated(this, child, children) | |
+ } | |
+ } | |
+ } | |
+ | |
+ final def handleChildTerminated(child: ActorRef): Unit = { | |
childrenRefs match { | |
case tc @ TerminatingChildrenContainer(_, _, reason) ⇒ | |
val n = tc.remove(child) | |
childrenRefs = n | |
- actor.supervisorStrategy.handleChildTerminated(this, child, children) | |
if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match { | |
- case Recreation(cause) ⇒ doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate" | |
- case Termination ⇒ doTerminate() | |
- case _ ⇒ | |
+ case Recreation(cause, failedActor) ⇒ | |
+ invokeHandleChildTerminated(Some(failedActor), child) | |
+ finishRecreate(cause, failedActor) | |
+ case Termination(terminatedActorOption) ⇒ | |
+ invokeHandleChildTerminated(terminatedActorOption, child) | |
+ // TODO should handlingCleanupFailure be moved inside | |
+ // finishTerminate as it is for finishRecreate? | |
+ handlingCleanupFailure(terminatedActorOption, "terminating") { | |
+ finishTerminate(terminatedActorOption) | |
+ } | |
+ case _ ⇒ | |
+ invokeHandleChildTerminated(Option(actor), child) | |
} | |
case _ ⇒ | |
childrenRefs = childrenRefs.remove(child) | |
- actor.supervisorStrategy.handleChildTerminated(this, child, children) | |
+ invokeHandleChildTerminated(Option(actor), child) | |
} | |
- } catch { | |
- case NonFatal(e) ⇒ | |
- try { | |
- dispatcher suspend this | |
- actor.supervisorStrategy.handleSupervisorFailing(self, children) | |
- } finally { | |
- parent.tell(Failed(e), self) | |
- } | |
} | |
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment