Created
February 1, 2011 15:05
-
-
Save viktorklang/805973 to your computer and use it in GitHub Desktop.
CQRS and EventSourcing using Akka Actors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.collection.mutable.ListBuffer | |
import akka.actor.{Actor,ActorRef} | |
import akka.actor.Actor._ | |
import akka.routing.{ Listeners, Listen } | |
//Represents a domain event | |
trait Event | |
//A builder to create domain entities | |
trait EntityBuilder[Entity] { | |
def build: Entity | |
} | |
//An EventStorage loads and stores events | |
trait EventStorage { | |
protected def saveEvents(events: Event*): Unit | |
protected def loadEvents(): Seq[Event] | |
} | |
//An EventSource needs to be mixed in together with an EventStorage, | |
//its responsibility is to facilitate construction of an entity given its history of events, | |
//and also to apply new events to the entity | |
trait EventSource[Entity,Builder <: EntityBuilder[Entity]] { self: EventStorage => | |
private var cached: Tuple2[Builder, Entity] = cache(newBuilder) //What we cache here is an instance of the entity builder and what the entity currently looks like | |
protected def newBuilder: Builder //Has to be implemented, returns a newly constructed empty builder | |
protected def applyEvent(builder: Builder, event: Event): Builder //Has to be implemented, applies the supplied event to the supplied builder, yielding a new builder | |
//A unit of work is demarcation for atomically creating zero or more events that should be applied to the entity | |
//Add all Events created in the unit of work to the supplied ListBuffer, those events will then be applied to the entity and returned as the result of the unit of work | |
def unitOfWork(work: ListBuffer[Event] => Unit): Seq[Event] = { | |
val buffer = new ListBuffer[Event] | |
work(buffer) | |
applyEvents(buffer:_*) | |
buffer | |
} | |
//Returns the current instance of the eventsourced entity | |
def entity = cached._2 | |
//replay loads and applies all events for the entity and caches the results | |
def replay(): Unit = | |
cached = cache(load()) | |
protected def cache(builder: Builder): (Builder, Entity) = (builder, builder.build) | |
//Loads all events and applies them to a new builder and returns the resulting builder | |
protected def load(): Builder = loadEvents().foldLeft(newBuilder)(applyEvent) | |
//First saves the events and then applies the events towards the cached builder | |
protected def applyEvents(events: Event*) { | |
saveEvents(events:_*) | |
cached = cache(events.foldLeft(cached._1)(applyEvent _)) | |
} | |
} | |
trait InMemoryEventStorage extends EventStorage { | |
//All events in the order of occurence | |
private var eventHistory: List[Event] = Nil | |
/** | |
* Saves the events in the order they occurred | |
*/ | |
protected def saveEvents(events: Event*) { | |
eventHistory = eventHistory ::: events.toList | |
} | |
/** | |
* Loads the events in the order they occurred | |
*/ | |
protected def loadEvents(): Seq[Event] = { | |
eventHistory | |
} | |
} | |
/** | |
* EventStreamActor receives events and publishes it to it's listeners | |
*/ | |
class EventStreamActor(id: String) extends Actor with Listeners { | |
self.id = id | |
def receive = listenerManagement orElse { | |
case e: Event => gossip(e) | |
} | |
} | |
/** | |
* Represents an Actor that handles Commands and publishes Events while representing an Entity of type E with a corresponding EntityBuilder of type B | |
*/ | |
trait ActorEventSourcing[E,B <: EntityBuilder[E]] extends Actor with EventSource[E, B] { self: EventStorage => | |
def receive = commandHandling | |
//Publishes the sequence of Events to the listeners | |
def publish(events: Seq[Event]) { for(e <- events) eventStream ! e } | |
//Defines where events are published | |
def eventStream: ActorRef | |
//Defines the command handlers | |
def commandHandling: Receive | |
} | |
/** | |
* | |
* Alright, enough of handwaiving, here's some sample code | |
* | |
**/ | |
//Represents a command to change a customer name | |
case class ChangeCustomerNameCommand(customerId: Long, name: String) | |
//Represents the event that a customer name was changed | |
case class ChangedName(customerId: Long, name: String) extends Event | |
//Customer is an entity object (immutable) | |
case class Customer(customerId: Long, age: Int, name: String) | |
//A builder used to create Customers from a sequence of Events | |
case class CustomerBuilder(customerId: Long, age: Int, name: String) extends EntityBuilder[Customer] { | |
def build = Customer(customerId, age, name) | |
} | |
object CustomerAR { | |
val eventStream = actorOf(new EventStreamActor("eventstream:customerAR")).start | |
} | |
//An Actor that represents a Customer, which is an Actor that processed commands and generates events that is applied to it's Customer which is stored in memory | |
class CustomerAR(val customerId: Long) extends ActorEventSourcing[Customer,CustomerBuilder] with InMemoryEventStorage { | |
def newBuilder = CustomerBuilder(customerId, 0, "") | |
val eventStream = CustomerAR.eventStream | |
//This is how events are applied to a CustomerBuilder, it's used to progress state and enable replays of events | |
def applyEvent(builder: CustomerBuilder, event: Event) : CustomerBuilder = event match { | |
case e@ChangedName(`customerId`, newName) => log.slf4j.debug("applyingEvent: {}",e); builder.copy(name = newName) | |
} | |
//This method defines how it reacts to Commands | |
def commandHandling = { | |
case ChangeCustomerNameCommand(`customerId`, newName) => //When it gets a name change command targeted at the corred customer id | |
publish( //Publish the resulting events | |
unitOfWork { //of the following unit of work | |
events => { | |
//If you need to check something on the instance of the entity you can call "entity" | |
//if (entity.name == "Chuck Norris") ... | |
events += ChangedName(customerId, newName) //Create the name change event | |
} | |
} | |
) | |
} | |
} | |
//Test code: | |
val debugListener = actorOf( new Actor { def receive = { case event => log.slf4j.debug("busListener received event: {}", event) } } ).start | |
CustomerAR.eventStream ! Listen(debugListener) | |
val customer = actorOf( new CustomerAR(55) ).start | |
customer ! ChangeCustomerNameCommand(55, "Jesus") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Can you remember which version of Akka you used for this? Thanks.