Last active
December 17, 2015 23:58
-
-
Save krasserm/5692790 to your computer and use it in GitHub Desktop.
Eventsourced processors in a parent-child relationship
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File | |
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.util.Timeout | |
import org.eligosource.eventsourced.core._ | |
import org.eligosource.eventsourced.journal.leveldb.LeveldbJournalProps | |
class Parent extends Actor { this: Receiver with Eventsourced => | |
implicit val recoveryTimeout = Timeout(10 seconds) | |
import context.dispatcher | |
var received: List[Int] = Nil | |
var child: Option[ActorRef] = None | |
def receive = { | |
case n: Int => { | |
if (n > 10) { | |
if (child.isEmpty) { child = Some(createChildProcessor(2, 2)) } | |
child.foreach(_ ! message /* current event message containing n */) | |
} else { | |
received = n :: received | |
println("parent received so far: " + received.reverse) | |
} | |
} | |
} | |
def createChildProcessor(pid: Int, cid: Int): ActorRef = { | |
val childActor = extension.processorOf(Props(new Child with Receiver with Confirm with Eventsourced { val id = pid } )) | |
val childChannel = extension.channelOf(DefaultChannelProps(cid, childActor)) | |
for { // asynchronous, non-blocking recovery | |
_ <- extension.replay(Seq(ReplayParams(pid))) | |
_ <- extension.deliver(cid) | |
} yield () | |
childChannel | |
} | |
} | |
class Child extends Actor { this: Receiver => | |
var received: List[Int] = Nil | |
def receive = { | |
case n: Int => { | |
received = n :: received | |
println("child received so far: " + received.reverse) | |
} | |
} | |
} | |
object Example extends App { | |
implicit val system = ActorSystem("example") | |
val journal: ActorRef = Journal(LeveldbJournalProps(new File("target/example"))) | |
val extension = EventsourcingExtension(system, journal) | |
val parent: ActorRef = extension.processorOf(Props(new Parent with Receiver with Eventsourced { val id = 1 } )) | |
extension.recover(Seq(ReplayParams(1))) | |
parent ! Message(6) | |
parent ! Message(14) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File | |
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.util.Timeout | |
import org.eligosource.eventsourced.core._ | |
import org.eligosource.eventsourced.journal.leveldb.LeveldbJournalProps | |
class Parent extends Actor { this: Receiver with Eventsourced => | |
implicit val recoveryTimeout = Timeout(10 seconds) | |
import context.dispatcher | |
var received: List[Int] = Nil | |
var child: Option[ActorRef] = None | |
def receive = { | |
case n: Int => { | |
if (n > 10) { | |
if (child.isEmpty) { child = Some(createChildProcessor(2, 2)) } | |
child.foreach(_ ! message.copy((n, randomPin))) | |
} else { | |
received = n :: received | |
println("parent received so far: " + received.reverse) | |
} | |
} | |
} | |
def createChildProcessor(pid: Int, cid: Int): ActorRef = { | |
val childActor = extension.processorOf(Props(new Child with Receiver with Confirm with Eventsourced { val id = pid } )) | |
val childChannel = extension.channelOf(DefaultChannelProps(cid, childActor)) | |
for { // asynchronous, non-blocking recovery | |
_ <- extension.replay(Seq(ReplayParams(pid))) | |
_ <- extension.deliver(cid) | |
} yield () | |
childChannel | |
} | |
def randomPin: String = | |
System.currentTimeMillis.toString.takeRight(4) | |
} | |
class Child extends Actor { this: Receiver => | |
var received: List[Int] = Nil | |
def receive = { | |
case (n: Int, pin: String) => { | |
received = n :: received | |
println(s"child received so far: ${received.reverse}, sending out random pin: ${pin}") | |
} | |
} | |
} | |
object Example extends App { | |
implicit val system = ActorSystem("example") | |
val journal: ActorRef = Journal(LeveldbJournalProps(new File("target/example"))) | |
val extension = EventsourcingExtension(system, journal) | |
val parent: ActorRef = extension.processorOf(Props(new Parent with Receiver with Eventsourced { val id = 1 } )) | |
extension.recover(Seq(ReplayParams(1))) | |
parent ! Message(6) | |
parent ! Message(14) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File | |
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.pattern.ask | |
import akka.util.Timeout | |
import org.eligosource.eventsourced.core._ | |
import org.eligosource.eventsourced.journal.leveldb.LeveldbJournalProps | |
class Parent extends Actor { this: Receiver with Eventsourced => | |
implicit val timeout = Timeout(2 seconds) | |
import context.dispatcher | |
var received: List[Int] = Nil | |
var child: Option[ActorRef] = None | |
def receive = { | |
case event => { | |
if (child.isEmpty) { child = Some(createDoorFSM(2, 2)) } | |
child.foreach { _ ? message onComplete { case r => println(s"response = ${r}") } } | |
} | |
} | |
def createDoorFSM(pid: Int, cid: Int): ActorRef = { | |
val childActor = extension.processorOf(Props(new Door with Receiver with Confirm with Eventsourced { val id = pid } )) | |
val childChannel = extension.channelOf(DefaultChannelProps(cid, childActor)) | |
for { // asynchronous, non-blocking recovery | |
_ <- extension.replay(Seq(ReplayParams(pid))) | |
_ <- extension.deliver(cid) | |
} yield () | |
childChannel | |
} | |
} | |
class Door extends Actor with FSM[String, Int] { this: Receiver => | |
val OPENED = "opened" | |
val CLOSED = "closed" | |
startWith(CLOSED, 0) | |
when(CLOSED) { | |
case Event("open", counter) => { | |
goto(OPENED) using(counter + 1) replying("door opened") | |
} | |
} | |
when(OPENED) { | |
case Event("close", counter) => { | |
goto(CLOSED) using(counter + 1) replying("door closed") | |
} | |
} | |
} | |
object Example extends App { | |
implicit val timeout = Timeout(10 seconds) | |
implicit val system = ActorSystem("example") | |
import system.dispatcher | |
val journal: ActorRef = Journal(LeveldbJournalProps(new File("target/example"))) | |
val extension = EventsourcingExtension(system, journal) | |
val parent: ActorRef = extension.processorOf(Props(new Parent with Receiver with Eventsourced { val id = 1 } )) | |
extension.recover(Seq(ReplayParams(1))) | |
parent ! Message("open") | |
parent ! Message("close") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File | |
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.pattern.ask | |
import akka.util.Timeout | |
import org.eligosource.eventsourced.core._ | |
import org.eligosource.eventsourced.journal.leveldb.LeveldbJournalProps | |
class Parent extends Actor { this: Receiver with Eventsourced => | |
implicit val timeout = Timeout(2 seconds) | |
import context.dispatcher | |
var received: List[Int] = Nil | |
var child: Option[ActorRef] = None | |
val childProcessorId = 2 | |
val childChannelId = 2 | |
def receive = { | |
case event => { | |
if (child.isEmpty) { child = Some(createDoorFSM(childProcessorId, childChannelId)) } | |
child.foreach { c => | |
// Only ask (?) channel if current message is not a confirmed message | |
// because confirmed messages will be dropped by the channel and no | |
// reply will be made. | |
if (!message.acks.contains(childChannelId)) c ? message onComplete { case r => println(s"response = ${r}") } | |
} | |
} | |
} | |
def createDoorFSM(pid: Int, cid: Int): ActorRef = { | |
val childActor = extension.processorOf(Props(new Door with Receiver with Confirm with Eventsourced { val id = pid } )) | |
val childChannel = extension.channelOf(DefaultChannelProps(cid, childActor)) | |
for { // asynchronous, non-blocking recovery | |
_ <- extension.replay(Seq(ReplayParams(pid))) | |
_ <- extension.deliver(cid) | |
} yield () | |
childChannel | |
} | |
} | |
class Door extends Actor with FSM[String, Int] { this: Receiver => | |
val OPENED = "opened" | |
val CLOSED = "closed" | |
startWith(CLOSED, 0) | |
when(CLOSED) { | |
case Event("open", counter) => { | |
goto(OPENED) using(counter + 1) replying("door opened") | |
} | |
} | |
when(OPENED) { | |
case Event("close", counter) => { | |
goto(CLOSED) using(counter + 1) replying("door closed") | |
} | |
} | |
} | |
object Example extends App { | |
implicit val timeout = Timeout(10 seconds) | |
implicit val system = ActorSystem("example") | |
import system.dispatcher | |
val journal: ActorRef = Journal(LeveldbJournalProps(new File("target/example"))) | |
val extension = EventsourcingExtension(system, journal) | |
val parent: ActorRef = extension.processorOf(Props(new Parent with Receiver with Eventsourced { val id = 1 } )) | |
extension.recover(Seq(ReplayParams(1))) | |
parent ! Message("open") | |
parent ! Message("close") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File | |
import scala.concurrent.duration._ | |
import scala.concurrent.Future | |
import scala.util.{Success, Failure} | |
import akka.actor._ | |
import akka.pattern.{ask, pipe} | |
import akka.util.Timeout | |
import org.eligosource.eventsourced.core._ | |
import org.eligosource.eventsourced.journal.leveldb.LeveldbJournalProps | |
case class Initialized(child: ActorRef) | |
class Parent extends Actor { | |
import context.dispatcher | |
val extn = EventsourcingExtension(context.system) | |
val pid = 1 | |
var child: ActorRef = _ | |
// buffer for messages received during | |
// asynchronous recovery of child actor | |
var buffer: List[(Any, ActorRef)] = Nil | |
def receive = { | |
case msg => { | |
recoverDoorFSM(pid) onComplete { | |
case Success(c) => self ! Initialized(c) | |
case Failure(_) => // handle recovery failure ... | |
} | |
context.become(initializing) // or become(initializing) if this actor is Eventsourced | |
buffer = (msg, sender) :: buffer | |
} | |
} | |
val initializing: Receive = { | |
case Initialized(c) => { | |
context.become(active) // or become(active) if this actor is Eventsourced | |
child = c | |
buffer.reverse.foreach { case (msg, sdr) => child tell (Message(msg), sdr) } | |
buffer = Nil | |
} | |
case msg => buffer = (msg, sender) :: buffer | |
} | |
val active: Receive = { | |
case msg => child forward Message(msg) | |
} | |
def recoverDoorFSM(pid: Int): Future[ActorRef] = { | |
implicit val timeout = Timeout(10 seconds) | |
val childActor = extn.processorOf(ProcessorProps(pid, pid => new Door with Receiver with Eventsourced { val id = pid } )) | |
extn.replay(Seq(ReplayParams(pid))).map(_ => childActor) // asynchronous, non-blocking recovery (no channel activation needed here) | |
} | |
} | |
class Door extends Actor with FSM[String, Int] { this: Receiver with Eventsourced => | |
implicit val timeout = Timeout(3 seconds) | |
import context.dispatcher | |
val AWAITING = "awaiting" | |
val OPENED = "opened" | |
val CLOSED = "closed" | |
val extn = EventsourcingExtension(context.system) | |
var child: ActorRef = _ | |
startWith(CLOSED, 0) | |
when(CLOSED) { | |
case Event("open", counter) => { | |
val replyTo = sender | |
if (!message.acks.contains(id)) { // avoid sending confirmed message to channel | |
val ftr = for { result <- child ? message.copy("open") } yield Message((result, replyTo)) | |
pipe(ftr) to self | |
} | |
goto(AWAITING) | |
} | |
} | |
when(OPENED) { | |
case Event("close", counter) => { | |
val replyTo = sender | |
if (!message.acks.contains(id)) { // avoid sending confirmed message to channel | |
val ftr = for { result <- child ? message.copy("close") } yield Message((result, replyTo)) | |
pipe(ftr) to self | |
} | |
goto(AWAITING) | |
} | |
} | |
when(AWAITING) { | |
case Event(("open-result", replyTo: ActorRef), counter) => { | |
replyTo ! "door opened" | |
goto(OPENED) using(counter + 1) | |
} | |
case Event(("close-result", replyTo: ActorRef), counter) => { | |
replyTo ! "door closed" | |
goto(CLOSED) using(counter + 1) | |
} | |
} | |
override def preStart() { | |
// create child actor wrapped by channel (channel id == processor id in this example) | |
child = extn.channelOf(DefaultChannelProps(id, context.actorOf(Props(new Worker with Receiver with Confirm)))) | |
extn.deliver(id) // channel activation | |
} | |
} | |
class Worker extends Actor { | |
def receive = { | |
case event => sender ! s"${event}-result" | |
} | |
} | |
object Example extends App { | |
implicit val timeout = Timeout(10 seconds) | |
implicit val system = ActorSystem("example") | |
import system.dispatcher | |
val journal: ActorRef = LeveldbJournalProps(new File("target/example")).withNative(false).createJournal | |
val extension = EventsourcingExtension(system, journal) | |
val parent: ActorRef = system.actorOf(Props[Parent]) | |
// Door FSM will likely reject close command because | |
// it is still in AWAITING state for an open-result | |
//parent ! "open" | |
//parent ! "close" | |
// ... therefore, replies from state transitions have | |
// to be awaited (non-blocking) by application using | |
// a monadic composition of futures | |
for { | |
a <- parent ? "open" | |
b <- parent ? "close" | |
} println(s"answers = ${a}, ${b}") | |
Thread.sleep(3000) | |
system.shutdown() | |
system.awaitTermination() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment