Skip to content

Instantly share code, notes, and snippets.

@j5ik2o
Created April 11, 2016 05:42
Show Gist options
  • Save j5ik2o/2a5459d1baf482954d949c4c7272b180 to your computer and use it in GitHub Desktop.
Save j5ik2o/2a5459d1baf482954d949c4c7272b180 to your computer and use it in GitHub Desktop.
object Base {
sealed trait State extends FSMState
case object Stopped extends State {
override def identifier: String = "Stopped"
}
case object Started extends State {
override def identifier: String = "Started"
}
trait Data
case object Empty extends Data
object Commands {
case class CommandSucceeded(id: UUID, requestId: UUID) extends Events.Event
case class CommandFailed(id: UUID, requestId: UUID, ex: Throwable)
}
object Events {
trait Event
}
}
object Money {
val Zero = Money(Currency.getInstance(Locale.getDefault), 0)
}
case class Money(currency: Currency, amount: Long) {
def plus(money: Money): Money = {
require(currency == money.currency)
copy(amount = amount + money.amount)
}
def +(money: Money): Money = plus(money)
def minus(money: Money): Money = {
require(currency == money.currency)
copy(amount = amount - money.amount)
}
def -(money: Money): Money = minus(money)
}
object BankAccount {
object Commands {
case class GetBalanceRequest(id: UUID)
case class Decrease(id: UUID, money: Money)
case class Increase(id: UUID, money: Money)
}
object Events {
case class GetBalanceResponse(id: UUID, balance: Money)
case class Decreased(id: UUID, money: Money)// extends Base.Events.Event
case class Increased(id: UUID, money: Money)// extends Base.Events.Event
}
def props(id: UUID): Props = Props(new BankAccount(id))
case class Context(id: UUID, balance: Money) extends Data {
def add(money: Money): Context = copy(balance = balance + money)
def remove(money: Money): Context = copy(balance = balance - money)
}
}
class BankAccount(id: UUID) extends PersistentActor with ActorLogging {
override def persistenceId: String = id.toString
var state = Context(UUID.randomUUID(), Money.Zero)
override def receiveRecover: Receive = {
case event: Increased =>
state = state.add(event.money)
case event: Decreased =>
state = state.remove(event.money)
}
override def receiveCommand: Receive = {
case GetBalanceRequest(_) =>
sender() ! GetBalanceResponse(UUID.randomUUID(), state.balance)
case Increase(_, money) =>
persist(Increased(UUID.randomUUID(), money)) { increased =>
state = state.add(increased.money)
log.debug("Increase = " + state.toString)
sender() ! CommandSucceeded(UUID.randomUUID(), increased.id)
context.system.eventStream.publish(increased)
}
case Decrease(_, money) =>
persist(Decreased(UUID.randomUUID(), money)) { decreased =>
state = state.remove(decreased.money)
log.debug("Decrease = " + state.toString)
sender() ! CommandSucceeded(UUID.randomUUID(), decreased.id)
context.system.eventStream.publish(decreased)
}
}
}
object TransferDomainService {
case object FromDecreased extends Base.State {
override def identifier: String = "FromDecreased"
}
case object ToIncreased extends Base.State {
override def identifier: String = "ToIncreased"
}
def props(id: UUID): Props = Props(new TransferDomainService(id))
case class TransferData(id: UUID, money: Money, from: ActorRef, to: ActorRef) extends Base.Data
object Commands {
case class Transfer(id: UUID, money: Money, from: ActorRef, to: ActorRef)
}
object Events {
case class Transferring(id: UUID, money: Money, from: ActorRef, to: ActorRef) extends Base.Events.Event
case class Transferred(id: UUID, money: Money, from: ActorRef, to: ActorRef) extends Base.Events.Event
}
}
class TransferDomainService(id: UUID)
extends PersistentFSM[Base.State, Data, Base.Events.Event] {
override def persistenceId: String = id.toString
override def domainEventClassTag: ClassTag[Base.Events.Event] = classTag[Base.Events.Event]
override def applyEvent(domainEvent: Base.Events.Event, currentData: Data): Data = domainEvent match {
case Transferred(id, money, from, to) =>
TransferData(id, money, from, to)
case Transferring(id, money, from, to) =>
TransferData(id, money, from, to)
}
startWith(Stopped, Empty)
when(Stopped) {
case Event(Transfer(requestId, money, from, to), _) =>
context.system.eventStream.subscribe(self, classOf[Decreased])
val ev = Transferring(UUID.randomUUID(), money, from, to)
goto(Started) applying ev andThen {
case d: TransferData =>
from ! Decrease(UUID.randomUUID(), money)
sender() ! CommandSucceeded(UUID.randomUUID(), requestId)
context.system.eventStream.publish(ev)
}
}
when(Started) {
case Event(Decreased(_, _), _) =>
goto(FromDecreased) andThen {
case d@TransferData(_, money, from, to) =>
context.system.eventStream.unsubscribe(self, classOf[Decreased])
context.system.eventStream.subscribe(self, classOf[Increased])
to ! Increase(UUID.randomUUID(), money)
}
case ev@Event(CommandSucceeded(_, _), _) =>
stay
}
when(FromDecreased) {
case Event(Increased(_, _), TransferData(_, money, from, to)) =>
goto(Stopped) applying Transferred(UUID.randomUUID(), money, from, to) andThen {
case d: TransferData =>
context.system.eventStream.unsubscribe(self, classOf[Increased])
context.system.eventStream.publish(Transferred(UUID.randomUUID(), money, from, to))
}
case ev@Event(CommandSucceeded(_, _), _) =>
stay
}
initialize()
}
object Main extends App {
val system = ActorSystem("PM", ConfigFactory.parseString(
"""
|akka {
| loglevel = DEBUG
| persistence.journal.plugin = "akka.persistence.journal.inmem"
|}
""".stripMargin
))
implicit val timeout = Timeout(5 seconds)
import system.dispatcher
val JPY = Currency.getInstance("JPY")
val to = system.actorOf(Props(new BankAccount(UUID.randomUUID())))
to ! Increase(UUID.randomUUID(), Money(JPY, 10))
val from = system.actorOf(Props(new BankAccount(UUID.randomUUID())))
from ! Increase(UUID.randomUUID(), Money(JPY, 10))
val actor = system.actorOf(Props(new TransferDomainService(UUID.randomUUID())))
actor ! Transfer(UUID.randomUUID(), Money(JPY, 10), to, from)
println(Await.result((to ? GetBalanceRequest(UUID.randomUUID())).mapTo[GetBalanceResponse], Duration.Inf))
println(Await.result((from ? GetBalanceRequest(UUID.randomUUID())).mapTo[GetBalanceResponse], Duration.Inf))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment