Created
August 7, 2013 09:33
-
-
Save vire/6172604 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package examples | |
import akka.actor._ | |
import scala.concurrent.duration._ | |
import scala.collection.immutable | |
// Master case classes | |
case object Start | |
// received events | |
case class SetTarget(ref: ActorRef) | |
case class Queue(obj: Any) | |
case object Flush | |
// sent events | |
case class Batch(obj: immutable.Seq[Any]) | |
// states | |
sealed trait State | |
case object Idle extends State | |
case object Active extends State | |
sealed trait Data | |
case object Uninitialized extends Data | |
case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends Data | |
class Buncher extends Actor with FSM[State, Data] { | |
startWith(Idle, Uninitialized) | |
when(Idle) { | |
case Event(SetTarget(ref), Uninitialized) => | |
stay using Todo(ref, Vector.empty) | |
} | |
// transition elided ... | |
when(Active, stateTimeout = 1 second) { | |
case Event(Flush | StateTimeout, t: Todo) => | |
goto(Idle) using t.copy(queue = Vector.empty) | |
} | |
whenUnhandled { | |
// common code for both states | |
case Event(Queue(obj), t@Todo(_, v)) => | |
goto(Active) using t.copy(queue = v :+ obj) | |
case Event("exceptionCase", _) => throw new RuntimeException | |
// case Event(e, s) => | |
// log.warning("received unhandled request {} in state {}/{}", e, stateName, s) | |
// stay | |
} | |
onTransition { | |
case Active -> Idle => | |
stateData match { | |
case Todo(ref, queue) => ref ! Batch (queue) | |
} | |
} | |
// unhandled elided ... | |
initialize() | |
override def preRestart(reason: Throwable, message: Option[Any]) { | |
super.preRestart(reason, message) | |
println(message, reason) | |
} | |
} | |
class ExampleActor extends Actor { | |
def receive = { | |
case x => println(x) | |
} | |
} | |
class MasterActor extends Actor { | |
override def supervisorStrategy: SupervisorStrategy = super.supervisorStrategy | |
def receive = { | |
case Start => | |
/** defines child Actors */ | |
val buncherAkkEx = context.actorOf(Props[Buncher]) | |
buncherAkkEx ! SetTarget(self) | |
buncherAkkEx ! Queue(10) | |
buncherAkkEx ! Queue(11) | |
buncherAkkEx ! Queue(12) | |
case Batch(queue) => | |
println(s"The queue is: $queue") | |
case m => context.children.head ! m | |
} | |
} | |
object FSMExample extends App { | |
val system = ActorSystem("FSMExample") | |
// val exampleAkk = system.actorOf(Props[ExampleActor]) | |
// val buncherAkkEx = system.actorOf(Props[Buncher]) | |
val masterAkk = system.actorOf(Props[MasterActor]) | |
// buncherAkkEx ! SetTarget(exampleAkk) | |
// buncherAkkEx ! Flush | |
masterAkk ! Start | |
masterAkk ! "exceptionCase" | |
// Thread.sleep(2000) | |
// system.shutdown() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment