Skip to content

Instantly share code, notes, and snippets.

@patriknw
Last active May 23, 2018 09:51
Show Gist options
  • Save patriknw/84af016a991aef8576db47593da0cf96 to your computer and use it in GitHub Desktop.
Save patriknw/84af016a991aef8576db47593da0cf96 to your computer and use it in GitHub Desktop.
Alternative implementation of PR 25051
object AccountExample1 {
sealed trait AccountCommand
case object CreateAccount extends AccountCommand
case class Deposit(amount: Double) extends AccountCommand
case class Withdraw(amount: Double) extends AccountCommand
case object CloseAccount extends AccountCommand
sealed trait AccountEvent
case object AccountCreated extends AccountEvent
case class Deposited(amount: Double) extends AccountEvent
case class Withdrawn(amount: Double) extends AccountEvent
case object AccountClosed extends AccountEvent
sealed trait Account
case class OpenedAccount(balance: Double) extends Account
case object ClosedAccount extends Account
private val initialHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] =
CommandHandler.command {
case CreateAccount ⇒ Effect.persist(AccountCreated)
case _ ⇒ Effect.unhandled
}
private val openedAccountHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = {
case (ctx, Some(acc: OpenedAccount), cmd) ⇒ cmd match {
case Deposit(amount) ⇒ Effect.persist(Deposited(amount))
case Withdraw(amount) ⇒
if ((acc.balance - amount) < 0.0)
Effect.unhandled // TODO replies are missing in this example
else {
Effect
.persist(Withdrawn(amount))
.andThen {
case Some(OpenedAccount(balance)) ⇒
// do some side-effect using balance
println(balance)
}
}
case CloseAccount if acc.balance == 0.0 ⇒
Effect.persist(AccountClosed)
case CloseAccount ⇒
Effect.unhandled
}
}
private val closedHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] =
CommandHandler.command(_ ⇒ Effect.unhandled)
private def commandHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] =
CommandHandler.byState {
case None ⇒ initialHandler
case Some(OpenedAccount(_)) ⇒ openedAccountHandler
case Some(ClosedAccount) ⇒ closedHandler
}
private val eventHandler: (Option[Account], AccountEvent) ⇒ Option[Account] = {
case (None, AccountCreated) ⇒ Some(OpenedAccount(0.0))
case (Some(acc @ OpenedAccount(_)), Deposited(amount)) ⇒
Some(acc.copy(balance = acc.balance + amount))
case (Some(acc @ OpenedAccount(_)), Withdrawn(amount)) ⇒
Some(acc.copy(balance = acc.balance - amount))
case (Some(OpenedAccount(_)), AccountClosed) ⇒
Some(ClosedAccount)
case (state, event) ⇒ throw new RuntimeException(s"unexpected event [$event] in state [$state]")
}
def behavior(accountNumber: String): Behavior[AccountCommand] =
PersistentBehaviors.receive[AccountCommand, AccountEvent, Option[Account]](
persistenceId = accountNumber,
initialState = None,
commandHandler = commandHandler,
eventHandler = eventHandler
)
}
object AccountExample2 {
sealed trait AccountCommand
case object CreateAccount extends AccountCommand
case class Deposit(amount: Double) extends AccountCommand
case class Withdraw(amount: Double) extends AccountCommand
case object CloseAccount extends AccountCommand
sealed trait AccountEvent
case object AccountCreated extends AccountEvent
case class Deposited(amount: Double) extends AccountEvent
case class Withdrawn(amount: Double) extends AccountEvent
case object AccountClosed extends AccountEvent
sealed trait Account
case object EmptyAccount extends Account
case class OpenedAccount(balance: Double) extends Account
case object ClosedAccount extends Account
private val initialHandler: CommandHandler[AccountCommand, AccountEvent, Account] =
CommandHandler.command {
case CreateAccount ⇒ Effect.persist(AccountCreated)
case _ ⇒ Effect.unhandled
}
private val openedAccountHandler: CommandHandler[AccountCommand, AccountEvent, Account] = {
case (ctx, acc: OpenedAccount, cmd) ⇒ cmd match {
case Deposit(amount) ⇒ Effect.persist(Deposited(amount))
case Withdraw(amount) ⇒
if ((acc.balance - amount) < 0.0)
Effect.unhandled // TODO replies are missing in this example
else {
Effect
.persist(Withdrawn(amount))
.andThen {
case OpenedAccount(balance) ⇒
// do some side-effect using balance
println(balance)
}
}
case CloseAccount if acc.balance == 0.0 ⇒
Effect.persist(AccountClosed)
case CloseAccount ⇒
Effect.unhandled
}
}
private val closedHandler: CommandHandler[AccountCommand, AccountEvent, Account] =
CommandHandler.command(_ ⇒ Effect.unhandled)
private def commandHandler: CommandHandler[AccountCommand, AccountEvent, Account] =
CommandHandler.byState {
case EmptyAccount ⇒ initialHandler
case OpenedAccount(_) ⇒ openedAccountHandler
case ClosedAccount ⇒ closedHandler
}
private val eventHandler: (Account, AccountEvent) ⇒ Account = {
case (EmptyAccount, AccountCreated) ⇒ OpenedAccount(0.0)
case (acc @ OpenedAccount(_), Deposited(amount)) ⇒
acc.copy(balance = acc.balance + amount)
case (acc @ OpenedAccount(_), Withdrawn(amount)) ⇒
acc.copy(balance = acc.balance - amount)
case (OpenedAccount(_), AccountClosed) ⇒
ClosedAccount
case (state, event) ⇒ throw new RuntimeException(s"unexpected event [$event] in state [$state]")
}
def behavior(accountNumber: String): Behavior[AccountCommand] =
PersistentBehaviors.receive[AccountCommand, AccountEvent, Account](
persistenceId = accountNumber,
initialState = EmptyAccount,
commandHandler = commandHandler,
eventHandler = eventHandler
)
}
package akka.docs.akka.persistence.typed
import akka.Done
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler
import akka.persistence.typed.internal.SideEffect
object InDepthPersistentBehaviorSpec3 {
abstract class OptionalInitialState[Command, Event, State] {
protected def initialCommandHandler: (ActorContext[Command], Command) ⇒ Effect[Event, State]
protected def activeCommandHandler: CommandHandler[Command, Event, State]
protected def initialEventHandler: Event ⇒ State
protected def activeEventHandler: (State, Event) ⇒ State
final val initialState: Option[State] = None
final def commandHandler: CommandHandler[Command, Event, Option[State]] = {
// capture the functions once, in case they are defined as defs
val initial = initialCommandHandler
val active = activeCommandHandler
CommandHandler.byState {
case Some(_) ⇒ convertEffect(active)
case None ⇒ convertInitialEffect(initial)
}
}
private def convertEffect(handler: CommandHandler[Command, Event, State]): CommandHandler[Command, Event, Option[State]] = {
(ctx, optState, cmd) ⇒
optState match {
case Some(state) ⇒
val effect = handler(ctx, state, cmd)
convertEffect(effect)
case None ⇒
throw new IllegalStateException("Undefined state")
}
}
private def convertInitialEffect(handler: (ActorContext[Command], Command) ⇒ Effect[Event, State]): CommandHandler[Command, Event, Option[State]] = {
(ctx, _, cmd) ⇒
val effect = handler(ctx, cmd)
convertEffect(effect)
}
private def convertEffect(effect: Effect[Event, State]): Effect[Event, Option[State]] = {
// FIXME traverse all chained effects and build up new that's replacing the SideEffect
effect match {
case SideEffect(callback) ⇒ SideEffect[Event, Option[State]] {
case Some(state) ⇒ callback(state)
case None ⇒ println(s"# ignoring side effect from initialCommandHandler when no state") // FIXME
}
}
}
final def eventHandler(state: Option[State], event: Event): Option[State] =
state match {
case None ⇒ Some(initialEventHandler(event))
case Some(s) ⇒ Some(activeEventHandler(s, event))
}
}
//#event
sealed trait BlogEvent extends Serializable
final case class PostAdded(
postId: String,
content: PostContent) extends BlogEvent
final case class BodyChanged(
postId: String,
newBody: String) extends BlogEvent
final case class Published(postId: String) extends BlogEvent
//#event
//#state
object BlogState {
val empty: Option[BlogState] = None
}
final case class BlogState(content: PostContent, published: Boolean) {
def withContent(newContent: PostContent): BlogState =
copy(content = newContent)
def postId: String = content.postId
}
//#state
//#commands
sealed trait BlogCommand extends Serializable
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand
final case class AddPostDone(postId: String)
final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand
final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand
final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand
final case object PassivatePost extends BlogCommand
final case class PostContent(postId: String, title: String, body: String)
//#commands
class BlogPostEntity(ctx: ActorContext[BlogCommand]) extends OptionalInitialState[BlogCommand, BlogEvent, BlogState] {
override protected def initialCommandHandler = { (ctx, cmd) ⇒
cmd match {
case AddPost(content, replyTo) ⇒
val evt = PostAdded(content.postId, content)
Effect.persist(evt).andThen { state2 ⇒
// After persist is done additional side effects can be performed
replyTo ! AddPostDone(content.postId)
}
case PassivatePost ⇒
Effect.stop
case _ ⇒
Effect.unhandled
}
}
override protected def activeCommandHandler = { (ctx, state, cmd) ⇒
cmd match {
case ChangeBody(newBody, replyTo) ⇒
val evt = BodyChanged(state.postId, newBody)
Effect.persist(evt).andThen { _ ⇒
replyTo ! Done
}
case Publish(replyTo) ⇒
Effect.persist(Published(state.postId)).andThen { _ ⇒
println(s"Blog post ${state.postId} was published")
replyTo ! Done
}
case GetPost(replyTo) ⇒
replyTo ! state.content
Effect.none
case _: AddPost ⇒
Effect.unhandled
case PassivatePost ⇒
Effect.stop
}
}
override protected def initialEventHandler = {
case PostAdded(postId, content) ⇒
BlogState(content, published = false)
}
override protected def activeEventHandler = { (state, event) ⇒
event match {
case BodyChanged(_, newBody) ⇒
state.withContent(state.content.copy(body = newBody))
case Published(_) ⇒
state.copy(published = true)
}
}
}
//#behavior
def behavior(entityId: String): Behavior[BlogCommand] = {
Behaviors.setup { ctx ⇒
val entity = new BlogPostEntity(ctx)
PersistentBehaviors.receive[BlogCommand, BlogEvent, Option[BlogState]](
persistenceId = "Blog-" + entityId,
initialState = entity.initialState,
commandHandler = entity.commandHandler,
eventHandler = entity.eventHandler)
}
}
//#behavior
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment