Skip to content

Instantly share code, notes, and snippets.

@gabfssilva
Last active June 14, 2017 02:28
Show Gist options
  • Save gabfssilva/c43ce599b72bd8493bd55adf2550b987 to your computer and use it in GitHub Desktop.
Save gabfssilva/c43ce599b72bd8493bd55adf2550b987 to your computer and use it in GitHub Desktop.
event sourcing
import java.util.{Date, UUID}
import InMemoryEventStore._
import scala.language.dynamics
/**
* @author Gabriel Francisco <[email protected]>
*/
class DynamicData(var data: Map[String, Any] = Map()) extends Dynamic {
def selectDynamic[T](name: String) = data.get(name).orNull.as[T]
def updateDynamic(name: String)(value: Any): Unit = data = data + (name -> value)
def copyTo(dynamicData: DynamicData) = dynamicData.data = data map { x => x }
implicit class AnyImplicitCast(val any: Any) {
def as[T] = any.asInstanceOf[T]
}
override def toString = s"{${data.map({ case (k, v) => s"$k=$v" }).mkString(",")}}"
}
abstract class Event[T] extends DynamicData {
val eventDate = new Date()
override def toString: String = s"${this.getClass.getSimpleName}${super.toString}"
def applyTo(aggregateId: String, t: T): T
override def equals(obj: scala.Any): Boolean = super.equals(obj)
}
trait EventStore[T] {
def add(aggregateId: String, event: Event[T])
def get(aggregateId: String): Seq[Event[T]]
}
class InMemoryEventStore[T] extends EventStore[T] {
val events = new collection.mutable.HashMap[String, collection.mutable.Set[Event[T]]] with collection.mutable.MultiMap[String, Event[T]]
override def add(aggregateId: String, event: Event[T]): Unit = events.addBinding(aggregateId, event)
override def get(aggregateId: String): Seq[Event[T]] = events(aggregateId).toSeq.sortBy(_.eventDate)
}
case class BankAccountCreated() extends Event[BankAccount] {
def applyTo(aggregateId: String, account: BankAccount): BankAccount = {
account.balance = 0
account.id = this.id
account.status = "ACTIVE"
account.owner = this.owner
account
}
}
case class DepositPerformed() extends Event[BankAccount] {
def applyTo(aggregateId: String, account: BankAccount): BankAccount = {
val newAccount = BankAccount()
account copyTo newAccount
newAccount.balance = account.balance[Int] + this.amount[Int]
newAccount
}
}
case class OwnerChanged() extends Event[BankAccount] {
def applyTo(aggregateId: String, account: BankAccount): BankAccount = {
val newAccount = BankAccount()
account copyTo newAccount
newAccount.owner = this.newOwner
newAccount
}
}
case class WithdrawalPerformed() extends Event[BankAccount] {
def applyTo(aggregateId: String, account: BankAccount): BankAccount = {
val newAccount = BankAccount()
account copyTo newAccount
newAccount.balance = account.balance[Int] - this.amount[Int]
newAccount
}
}
case class BankAccountClosed() extends Event[BankAccount] {
def applyTo(aggregateId: String, account: BankAccount): BankAccount = {
val newAccount = BankAccount()
account copyTo newAccount
newAccount.closeReason = this.closeReason
newAccount.status = "CLOSED"
newAccount
}
}
object BankAccount {
def apply(aggregateId: String, id: Int, owner: String): BankAccount = {
val event = BankAccountCreated()
event.id = id
event.owner = owner
event applyTo(aggregateId, BankAccount())
}
}
abstract class Command[R, T](implicit val eventStore: EventStore[T]) extends (((String, R, T)) => (T)) {
type ExecutionProduce = (String, R) => (T) => Event[T]
def execute: ExecutionProduce
implicit class CommandTupleImplicit(val tuple: (String, R, T)) {
lazy val aggregateId: String = tuple._1
lazy val request: R = tuple._2
lazy val actualState: T = tuple._3
}
override def apply(parameters: (String, R, T)): T = {
println("executing:" + this.getClass.getSimpleName)
val event: Event[T] = execute(parameters.aggregateId, parameters.request)(parameters.actualState)
eventStore.add(parameters.aggregateId, event)
event applyTo(parameters.aggregateId, parameters.actualState)
}
}
case class Request(d: Map[String, Any]) extends DynamicData(d)
object EventPlayer {
def play[T](initialObject: T, aggregateId: String, events: Seq[Event[T]]): T = {
var state = initialObject
for (event <- events) {
state = event.applyTo(aggregateId, state)
}
state
}
}
object BankAccountCommands {
class CreateAccountCommand extends Command[Request, BankAccount] {
override def execute: ExecutionProduce = (_, request) => (_) => {
val event = BankAccountCreated()
event.id = request.id
event.owner = request.owner
event
}
}
class DepositCommand extends Command[Request, BankAccount] {
override def execute: ExecutionProduce = (_, request) => (_) => {
val event = DepositPerformed()
event.amount = request.amount
event
}
}
class WithdrawalCommand extends Command[Request, BankAccount] {
override def execute: ExecutionProduce = (_, request) => (_) => {
val event = WithdrawalPerformed()
event.amount = request.amount
event
}
}
class ChangeOwnerCommand extends Command[Request, BankAccount] {
override def execute: ExecutionProduce = (_, request) => (_) => {
val event = OwnerChanged()
event.newOwner = request.newOwner
event
}
}
class CloseCommand extends Command[Request, BankAccount] {
override def execute: ExecutionProduce = (_, request) => (_) => {
val event = BankAccountClosed()
event.closeReason = request.reason
event
}
}
type ExecutionRequest = (String, Request) => (BankAccount) => BankAccount
val createAccount: ExecutionRequest = { (id, req) => (account) => new CreateAccountCommand().apply(id, req, account) }
val withdrawal: ExecutionRequest = { (id, req) => (account) => new WithdrawalCommand().apply(id, req, account) }
val deposit: ExecutionRequest = { (id, req) => (account) => new DepositCommand().apply(id, req, account) }
val changeOwner: ExecutionRequest = { (id, req) => (account) => new ChangeOwnerCommand().apply(id, req, account) }
val close: ExecutionRequest = { (id, req) => (account) => new CloseCommand().apply(id, req, account) }
}
object InMemoryEventStore {
implicit val eventStore: EventStore[BankAccount] = new InMemoryEventStore[BankAccount]
}
case class BankAccount() extends DynamicData
object Sample extends App {
import BankAccountCommands._
val aggregateId = UUID.randomUUID().toString
val f =
createAccount(aggregateId, Request(Map("owner" -> "John Doe", "id" -> 123)))
.andThen(deposit(aggregateId, Request(Map("amount" -> 20))))
.andThen(changeOwner(aggregateId, Request(Map("newOwer" -> "Jane Doe"))))
.andThen(withdrawal(aggregateId, Request(Map("amount" -> 10))))
.andThen(close(aggregateId, Request(Map("reason" -> "Unavailable address"))))
val actualState = f(BankAccount())
val events = eventStore.get(aggregateId)
val playedState = EventPlayer.play(BankAccount(), aggregateId, events)
println(s"actual state: $actualState")
println(s"events: $events")
println(s"played state: $playedState")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment