Last active
December 25, 2015 04:29
-
-
Save krasserm/6917790 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import akka.persistence._ | |
object EventsourcingExample extends App { | |
class EventsourcedProcessor extends Processor { | |
var events: List[Any] = Nil | |
var awaitStack: List[Actor.Receive] = Nil | |
def receive = { | |
case Persistent(event, _) if recoveryRunning ⇒ update(event) | |
case cmd ⇒ { | |
handle(s"${cmd}-1")(handler) | |
handle(s"${cmd}-2")(handler) | |
} | |
} | |
def update(event: Any) { | |
events = event :: events | |
} | |
val handler: Actor.Receive = { | |
case event ⇒ { | |
println(s"applying ${event} to current state = ${events.reverse}") | |
update(event) | |
} | |
} | |
def handle[A](event: A)(handler: A ⇒ Unit): Unit = { | |
val awaitingPersistence: Receive = { | |
case Persistent(`event`, sequenceNr) ⇒ | |
context.unbecome() | |
awaitStack = awaitStack.tail | |
unstashAll() | |
handler(event) | |
case _ ⇒ stash() | |
} | |
// revert awaitingPersistence behaviours on the actor | |
// behavior stack to process events in correct order | |
awaitStack.foreach(_ ⇒ context.unbecome()) | |
awaitStack = awaitingPersistence :: awaitStack | |
awaitStack.foreach(context.become(_, discardOld = false)) | |
self forward Persistent(event) | |
} | |
} | |
val system = ActorSystem("example") | |
val processor = system.actorOf(Props[EventsourcedProcessor]) | |
processor ! "foo" | |
processor ! "bar" | |
Thread.sleep(1000) | |
system.shutdown() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Support for eventsourced processors. Will become | |
* part of akka-persistence ... | |
*/ | |
trait EventsourcedProcessor extends Processor { | |
private var awaitStack: List[Actor.Receive] = Nil | |
final def persist[A](event: A)(handler: A ⇒ Unit): Unit = { | |
val awaitingPersistence: Receive = { | |
case PersistenceFailure(`event`, _, cause) ⇒ { | |
throw cause // TODO: refine | |
} | |
case Persistent(`event`, _) ⇒ | |
context.unbecome() | |
awaitStack = awaitStack.tail | |
unstashAll() | |
handler(event) | |
case _ ⇒ stash() | |
} | |
// Revert awaitingPersistence ordering on | |
// the processor actor's behavior stack. | |
// Needed to handle persisted events in | |
// correct order. | |
awaitStack.foreach(_ ⇒ context.unbecome()) | |
awaitStack = awaitingPersistence :: awaitStack | |
awaitStack.foreach(context.become(_, discardOld = false)) | |
self forward Persistent(event) | |
} | |
final def receive = { | |
case Persistent(payload, _) if recoveryRunning ⇒ recoveryBehavior(payload) | |
case msg ⇒ defaultBehavior(msg) | |
} | |
def recoveryBehavior: Actor.Receive | |
def defaultBehavior: Actor.Receive | |
} | |
// ------------------------------------------------------------------ | |
// Usage examples | |
// ------------------------------------------------------------------ | |
case class Cmd(data: Any) | |
case class Evt(data: Any) | |
/** | |
* Base trait of all example processors | |
*/ | |
trait ExampleProcessor extends EventsourcedProcessor { | |
// state is the list of all handled events | |
var events: List[Any] = Nil | |
// update state by handling replayed events | |
val updateSate: Actor.Receive = { | |
case Evt(data) ⇒ events = data :: events | |
} | |
// executed during recovery | |
def recoveryBehavior = updateSate | |
} | |
/** | |
* Receive command, then persist and handle event. | |
*/ | |
class ExampleProcessor1 extends ExampleProcessor { | |
val defaultBehavior: Actor.Receive = { | |
// Triggered by (transient) cmd | |
// | |
// - create Evt("foo-1") | |
// - persist event (asynchronously) | |
// - handle event after successful persistence | |
// (may close over processor state) | |
// | |
// It is guaranteed that no other messages are | |
// processed in between calling persist and | |
// execution of the event handler (= updateState) | |
// | |
case Cmd("foo") ⇒ persist(Evt("foo-1"))(updateSate) | |
} | |
} | |
/** | |
* Notify listeners after successful persistence and handling of event. | |
*/ | |
class ExampleProcessor2 extends ExampleProcessor { | |
val defaultBehavior: Actor.Receive = { | |
case Cmd("foo") ⇒ persist(Evt("foo-1")) { event ⇒ | |
updateSate(event) | |
// Event handler not executed during recovery, | |
// so we can notify listeners here | |
context.system.eventStream publish event | |
} | |
} | |
} | |
/** | |
* Persist and handle more than one event per command. | |
*/ | |
class ExampleProcessor3 extends ExampleProcessor { | |
val defaultBehavior: Actor.Receive = { | |
case Cmd("foo") ⇒ { | |
persist(Evt("foo-1"))(updateSate) | |
persist(Evt("foo-2"))(updateSate) | |
} | |
} | |
} | |
/** | |
* Change eventsourced processor behavior in event handler. | |
*/ | |
class ExampleProcessor4 extends ExampleProcessor { | |
val defaultBehavior: Actor.Receive = { | |
case Cmd("foo") ⇒ { | |
persist(Evt("foo-1"))(updateSate) | |
persist(Evt("foo-2")) { event ⇒ | |
updateSate(event) | |
context.become(barBehavior) | |
} | |
} | |
} | |
val barBehavior: Actor.Receive = { | |
case Cmd("bar") ⇒ { | |
persist(Evt("bar-1")) { event ⇒ | |
updateSate(event) | |
// switch to defaultBehavior | |
context.unbecome() | |
} | |
} | |
} | |
} | |
/** | |
* Mix event sourcing and command sourcing | |
* (possible but not recommended though ...) | |
*/ | |
class ExampleProcessor5 extends ExampleProcessor { | |
val defaultBehavior: Actor.Receive = { | |
// transient command -> persistent event | |
case Cmd("foo") ⇒ persist(Evt("foo-1"))(updateSate) | |
// persistent command | |
case Persistent(cmd: Cmd, _) ⇒ recoveryBehavior(cmd) | |
} | |
// requires overriding recoveryBehavior to handle replayed commands | |
override def recoveryBehavior = updateSate orElse { | |
case Cmd(data) ⇒ // ... | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment