Created
August 23, 2012 21:12
-
-
Save gseitz/3441818 to your computer and use it in GitHub Desktop.
Akka FSM Supervision
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.actor | |
import collection.mutable | |
import akka.actor.FSM.{ CurrentState, Transition, UnsubscribeTransitionCallBack, SubscribeTransitionCallBack } | |
import akka.routing.{ Deafen, Listen } | |
case object PreRestart | |
case object PostRestart | |
trait ParentNotification { thisActor: Actor ⇒ | |
override def preRestart(reason: Throwable, message: Option[Any]) { | |
context.parent ! PreRestart | |
} | |
override def postRestart(reason: Throwable) { | |
context.parent ! PostRestart | |
} | |
} | |
class FSMSupervisor(targetProps: Props) extends Actor { | |
private val transitionCallbacks = mutable.Set[ActorRef]() | |
private var currentState: Option[Any] = None | |
private val target = context.actorOf(targetProps) | |
override def preStart() { | |
target ! SubscribeTransitionCallBack(self) | |
} | |
def receive = { | |
case SubscribeTransitionCallBack(listener) ⇒ subscribe(listener) | |
case Listen(listener) ⇒ subscribe(listener) | |
case UnsubscribeTransitionCallBack(listener) ⇒ unsubscribe(listener) | |
case Deafen(listener) ⇒ unsubscribe(listener) | |
case Transition(`target`, from, to) ⇒ | |
currentState = Some(to) | |
notifyListeners(Transition(self, from, to)) | |
case CurrentState(`target`, state) ⇒ | |
val toPublish = currentState.map(c ⇒ Transition(self, c, state)).getOrElse(CurrentState(self, state)) | |
currentState = Some(state) | |
notifyListeners(toPublish) | |
case PreRestart ⇒ target ! SubscribeTransitionCallBack(self) | |
case PostRestart ⇒ | |
case msg ⇒ target forward msg | |
} | |
private def subscribe(listener: ActorRef) { | |
transitionCallbacks += listener | |
currentState.foreach(st ⇒ listener ! CurrentState(self, st)) | |
} | |
private def unsubscribe(listener: ActorRef) { | |
transitionCallbacks -= listener | |
} | |
private def notifyListeners(msg: AnyRef) { | |
transitionCallbacks foreach { | |
case cb ⇒ | |
if (cb.isTerminated) transitionCallbacks -= cb | |
else cb ! msg | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> | |
*/ | |
package akka.actor | |
import language.postfixOps | |
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } | |
import akka.testkit._ | |
import TestEvent.Mute | |
import scala.concurrent.util.duration._ | |
import akka.event._ | |
import com.typesafe.config.ConfigFactory | |
import scala.concurrent.Await | |
import akka.util.Timeout | |
import scala.concurrent.util.Duration | |
object FSMSupervisionSpec { | |
val timeout = Timeout(2 seconds) | |
class Latches(implicit system: ActorSystem) { | |
val unlockedLatch = TestLatch() | |
val lockedLatch = TestLatch() | |
val unhandledLatch = TestLatch() | |
val terminatedLatch = TestLatch() | |
val transitionLatch = TestLatch() | |
val initialStateLatch = TestLatch() | |
val transitionCallBackLatch = TestLatch() | |
} | |
sealed trait LockState | |
case object Locked extends LockState | |
case object Open extends LockState | |
class Lock(code: String, timeout: Duration, latches: Latches) extends Actor with FSM[LockState, CodeState] with ParentNotification { | |
import latches._ | |
startWith(Locked, CodeState("", code)) | |
when(Locked) { | |
case Event(digit: Char, CodeState(soFar, code)) ⇒ { | |
soFar + digit match { | |
case incomplete if incomplete.length < code.length ⇒ | |
stay using CodeState(incomplete, code) | |
case codeTry if (codeTry == code) ⇒ { | |
doUnlock | |
goto(Open) using CodeState("", code) forMax timeout | |
} | |
case wrong ⇒ { | |
stay using CodeState("", code) | |
} | |
} | |
} | |
case Event("hello", _) ⇒ stay replying "world" | |
case Event("bye", _) ⇒ stop(FSM.Shutdown) | |
} | |
when(Open) { | |
case Event(StateTimeout, _) ⇒ { | |
doLock | |
goto(Locked) | |
} | |
} | |
whenUnhandled { | |
case Event(msg, _) ⇒ { | |
log.warning("unhandled event " + msg + " in state " + stateName + " with data " + stateData) | |
unhandledLatch.open | |
stay | |
} | |
} | |
onTransition { | |
case Locked -> Open ⇒ transitionLatch.open | |
} | |
// verify that old-style does still compile | |
onTransition(transitionHandler _) | |
def transitionHandler(from: LockState, to: LockState) = { | |
// dummy | |
} | |
onTermination { | |
case StopEvent(FSM.Shutdown, Locked, _) ⇒ | |
// stop is called from lockstate with shutdown as reason... | |
terminatedLatch.open | |
} | |
// initialize the lock | |
initialize | |
private def doLock() { | |
lockedLatch.open | |
} | |
private def doUnlock = { | |
unlockedLatch.open | |
} | |
} | |
case class CodeState(soFar: String, code: String) | |
} | |
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) | |
class FSMSupervisionSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with ImplicitSender { | |
import FSMSupervisionSpec._ | |
"An FSM Actor Supervisor" must { | |
"let the FSM Actor unlock the lock" in { | |
import FSM.{ Transition, CurrentState, SubscribeTransitionCallBack } | |
val latches = new Latches | |
import latches._ | |
// lock that locked after being open for 1 sec | |
lazy val target = Props(new Lock("33221", 1 second, latches)) | |
val lock = system.actorOf(Props(new FSMSupervisor(target))) | |
val transitionTester = system.actorOf(Props(new Actor { | |
def receive = { | |
case Transition(_, _, _) ⇒ transitionCallBackLatch.open | |
case CurrentState(_, Locked) ⇒ initialStateLatch.open | |
} | |
})) | |
lock ! SubscribeTransitionCallBack(transitionTester) | |
Await.ready(initialStateLatch, timeout.duration) | |
lock ! '3' | |
lock ! '3' | |
lock ! '2' | |
lock ! '2' | |
lock ! '1' | |
Await.ready(unlockedLatch, timeout.duration) | |
Await.ready(transitionLatch, timeout.duration) | |
Await.ready(transitionCallBackLatch, timeout.duration) | |
Await.ready(lockedLatch, timeout.duration) | |
EventFilter.warning(start = "unhandled event", occurrences = 1) intercept { | |
lock ! "not_handled" | |
Await.ready(unhandledLatch, timeout.duration) | |
} | |
val answerLatch = TestLatch() | |
object Hello | |
object Bye | |
val tester = system.actorOf(Props(new Actor { | |
def receive = { | |
case Hello ⇒ lock ! "hello" | |
case "world" ⇒ answerLatch.open | |
case Bye ⇒ lock ! "bye" | |
} | |
})) | |
tester ! Hello | |
Await.ready(answerLatch, timeout.duration) | |
tester ! Bye | |
Await.ready(terminatedLatch, timeout.duration) | |
} | |
"run onTermination upon ActorRef.stop() on the supervisor" in { | |
val started = TestLatch(1) | |
lazy val fsm = new Actor with FSM[Int, Null] with ParentNotification { | |
override def preStart = { started.countDown } | |
startWith(1, null) | |
when(1) { FSM.NullFunction } | |
onTermination { | |
case x ⇒ testActor ! x | |
} | |
} | |
val supervisor = system.actorOf(Props(new FSMSupervisor(Props(fsm)))) | |
Await.ready(started, timeout.duration) | |
system.stop(supervisor) | |
expectMsg(1 second, fsm.StopEvent(FSM.Shutdown, 1, null)) | |
} | |
"must keep subscriptions in case of a restart of the FSM actor" in { | |
import FSM._ | |
val started = TestLatch(1) | |
val supervisor = system.actorOf(Props(new FSMSupervisor(Props(new Actor with FSM[Int, Null] with ParentNotification { | |
override def preStart = { started.countDown } | |
startWith(1, null) | |
when(1) { case Event(2, _) ⇒ goto(2) } | |
when(2) { case Event(_, _) ⇒ sys.error("Not accepting any messages") } | |
})))) | |
Await.ready(started, timeout.duration) | |
supervisor ! SubscribeTransitionCallBack(testActor) | |
supervisor ! 2 | |
supervisor ! 3 | |
expectMsg(1 second, CurrentState(supervisor, 1)) | |
expectMsg(1 second, Transition(supervisor, 1, 2)) | |
expectMsg(1 second, Transition(supervisor, 2, 1)) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kka > akka-actor-tests/test:test-only akka.actor.FSMSupervisionSpec | |
[info] FSMSupervisionSpec: | |
[info] An FSM Actor Supervisor | |
[info] - must let the FSM Actor unlock the lock (1 second, 178 milliseconds) | |
[info] - must run onTermination upon ActorRef.stop() on the supervisor (13 milliseconds) | |
[ERROR] [08/23/2012 23:11:15.918] [FSMSupervisionSpec-akka.actor.default-dispatcher-3] [akka://FSMSupervisionSpec/user/$e/$a] Not accepting any messages | |
java.lang.RuntimeException: Not accepting any messages | |
at scala.sys.package$.error(package.scala:27) | |
at akka.actor.FSMSupervisionSpec$$anonfun$4$$anonfun$apply$mcV$sp$3$$anonfun$15$$anonfun$apply$2$$anon$2$$anonfun$9.applyOrElse(FSMSupervisionSpec.scala:190) | |
at akka.actor.FSMSupervisionSpec$$anonfun$4$$anonfun$apply$mcV$sp$3$$anonfun$15$$anonfun$apply$2$$anon$2$$anonfun$9.applyOrElse(FSMSupervisionSpec.scala:190) | |
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:37) | |
at akka.actor.FSM$class.processEvent(FSM.scala:568) | |
at akka.actor.FSMSupervisionSpec$$anonfun$4$$anonfun$apply$mcV$sp$3$$anonfun$15$$anonfun$apply$2$$anon$2.processEvent(FSMSupervisionSpec.scala:186) | |
at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:562) | |
at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:556) | |
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:388) | |
at akka.actor.ActorCell.invoke(ActorCell.scala:364) | |
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:230) | |
at akka.dispatch.Mailbox.run(Mailbox.scala:212) | |
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516) | |
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262) | |
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975) | |
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478) | |
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104) | |
[info] - must must keep subscriptions in case of a restart of the FSM actor (10 milliseconds) | |
[info] Passed: : Total 3, Failed 0, Errors 0, Passed 3, Skipped 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment