Last active
June 14, 2017 02:28
-
-
Save gabfssilva/c43ce599b72bd8493bd55adf2550b987 to your computer and use it in GitHub Desktop.
event sourcing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.{Date, UUID} | |
import InMemoryEventStore._ | |
import scala.language.dynamics | |
/** | |
* @author Gabriel Francisco <[email protected]> | |
*/ | |
class DynamicData(var data: Map[String, Any] = Map()) extends Dynamic { | |
def selectDynamic[T](name: String) = data.get(name).orNull.as[T] | |
def updateDynamic(name: String)(value: Any): Unit = data = data + (name -> value) | |
def copyTo(dynamicData: DynamicData) = dynamicData.data = data map { x => x } | |
implicit class AnyImplicitCast(val any: Any) { | |
def as[T] = any.asInstanceOf[T] | |
} | |
override def toString = s"{${data.map({ case (k, v) => s"$k=$v" }).mkString(",")}}" | |
} | |
abstract class Event[T] extends DynamicData { | |
val eventDate = new Date() | |
override def toString: String = s"${this.getClass.getSimpleName}${super.toString}" | |
def applyTo(aggregateId: String, t: T): T | |
override def equals(obj: scala.Any): Boolean = super.equals(obj) | |
} | |
trait EventStore[T] { | |
def add(aggregateId: String, event: Event[T]) | |
def get(aggregateId: String): Seq[Event[T]] | |
} | |
class InMemoryEventStore[T] extends EventStore[T] { | |
val events = new collection.mutable.HashMap[String, collection.mutable.Set[Event[T]]] with collection.mutable.MultiMap[String, Event[T]] | |
override def add(aggregateId: String, event: Event[T]): Unit = events.addBinding(aggregateId, event) | |
override def get(aggregateId: String): Seq[Event[T]] = events(aggregateId).toSeq.sortBy(_.eventDate) | |
} | |
case class BankAccountCreated() extends Event[BankAccount] { | |
def applyTo(aggregateId: String, account: BankAccount): BankAccount = { | |
account.balance = 0 | |
account.id = this.id | |
account.status = "ACTIVE" | |
account.owner = this.owner | |
account | |
} | |
} | |
case class DepositPerformed() extends Event[BankAccount] { | |
def applyTo(aggregateId: String, account: BankAccount): BankAccount = { | |
val newAccount = BankAccount() | |
account copyTo newAccount | |
newAccount.balance = account.balance[Int] + this.amount[Int] | |
newAccount | |
} | |
} | |
case class OwnerChanged() extends Event[BankAccount] { | |
def applyTo(aggregateId: String, account: BankAccount): BankAccount = { | |
val newAccount = BankAccount() | |
account copyTo newAccount | |
newAccount.owner = this.newOwner | |
newAccount | |
} | |
} | |
case class WithdrawalPerformed() extends Event[BankAccount] { | |
def applyTo(aggregateId: String, account: BankAccount): BankAccount = { | |
val newAccount = BankAccount() | |
account copyTo newAccount | |
newAccount.balance = account.balance[Int] - this.amount[Int] | |
newAccount | |
} | |
} | |
case class BankAccountClosed() extends Event[BankAccount] { | |
def applyTo(aggregateId: String, account: BankAccount): BankAccount = { | |
val newAccount = BankAccount() | |
account copyTo newAccount | |
newAccount.closeReason = this.closeReason | |
newAccount.status = "CLOSED" | |
newAccount | |
} | |
} | |
object BankAccount { | |
def apply(aggregateId: String, id: Int, owner: String): BankAccount = { | |
val event = BankAccountCreated() | |
event.id = id | |
event.owner = owner | |
event applyTo(aggregateId, BankAccount()) | |
} | |
} | |
abstract class Command[R, T](implicit val eventStore: EventStore[T]) extends (((String, R, T)) => (T)) { | |
type ExecutionProduce = (String, R) => (T) => Event[T] | |
def execute: ExecutionProduce | |
implicit class CommandTupleImplicit(val tuple: (String, R, T)) { | |
lazy val aggregateId: String = tuple._1 | |
lazy val request: R = tuple._2 | |
lazy val actualState: T = tuple._3 | |
} | |
override def apply(parameters: (String, R, T)): T = { | |
println("executing:" + this.getClass.getSimpleName) | |
val event: Event[T] = execute(parameters.aggregateId, parameters.request)(parameters.actualState) | |
eventStore.add(parameters.aggregateId, event) | |
event applyTo(parameters.aggregateId, parameters.actualState) | |
} | |
} | |
case class Request(d: Map[String, Any]) extends DynamicData(d) | |
object EventPlayer { | |
def play[T](initialObject: T, aggregateId: String, events: Seq[Event[T]]): T = { | |
var state = initialObject | |
for (event <- events) { | |
state = event.applyTo(aggregateId, state) | |
} | |
state | |
} | |
} | |
object BankAccountCommands { | |
class CreateAccountCommand extends Command[Request, BankAccount] { | |
override def execute: ExecutionProduce = (_, request) => (_) => { | |
val event = BankAccountCreated() | |
event.id = request.id | |
event.owner = request.owner | |
event | |
} | |
} | |
class DepositCommand extends Command[Request, BankAccount] { | |
override def execute: ExecutionProduce = (_, request) => (_) => { | |
val event = DepositPerformed() | |
event.amount = request.amount | |
event | |
} | |
} | |
class WithdrawalCommand extends Command[Request, BankAccount] { | |
override def execute: ExecutionProduce = (_, request) => (_) => { | |
val event = WithdrawalPerformed() | |
event.amount = request.amount | |
event | |
} | |
} | |
class ChangeOwnerCommand extends Command[Request, BankAccount] { | |
override def execute: ExecutionProduce = (_, request) => (_) => { | |
val event = OwnerChanged() | |
event.newOwner = request.newOwner | |
event | |
} | |
} | |
class CloseCommand extends Command[Request, BankAccount] { | |
override def execute: ExecutionProduce = (_, request) => (_) => { | |
val event = BankAccountClosed() | |
event.closeReason = request.reason | |
event | |
} | |
} | |
type ExecutionRequest = (String, Request) => (BankAccount) => BankAccount | |
val createAccount: ExecutionRequest = { (id, req) => (account) => new CreateAccountCommand().apply(id, req, account) } | |
val withdrawal: ExecutionRequest = { (id, req) => (account) => new WithdrawalCommand().apply(id, req, account) } | |
val deposit: ExecutionRequest = { (id, req) => (account) => new DepositCommand().apply(id, req, account) } | |
val changeOwner: ExecutionRequest = { (id, req) => (account) => new ChangeOwnerCommand().apply(id, req, account) } | |
val close: ExecutionRequest = { (id, req) => (account) => new CloseCommand().apply(id, req, account) } | |
} | |
object InMemoryEventStore { | |
implicit val eventStore: EventStore[BankAccount] = new InMemoryEventStore[BankAccount] | |
} | |
case class BankAccount() extends DynamicData | |
object Sample extends App { | |
import BankAccountCommands._ | |
val aggregateId = UUID.randomUUID().toString | |
val f = | |
createAccount(aggregateId, Request(Map("owner" -> "John Doe", "id" -> 123))) | |
.andThen(deposit(aggregateId, Request(Map("amount" -> 20)))) | |
.andThen(changeOwner(aggregateId, Request(Map("newOwer" -> "Jane Doe")))) | |
.andThen(withdrawal(aggregateId, Request(Map("amount" -> 10)))) | |
.andThen(close(aggregateId, Request(Map("reason" -> "Unavailable address")))) | |
val actualState = f(BankAccount()) | |
val events = eventStore.get(aggregateId) | |
val playedState = EventPlayer.play(BankAccount(), aggregateId, events) | |
println(s"actual state: $actualState") | |
println(s"events: $events") | |
println(s"played state: $playedState") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment