Last active
May 23, 2018 09:51
-
-
Save patriknw/84af016a991aef8576db47593da0cf96 to your computer and use it in GitHub Desktop.
Alternative implementation of PR 25051
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object AccountExample1 { | |
sealed trait AccountCommand | |
case object CreateAccount extends AccountCommand | |
case class Deposit(amount: Double) extends AccountCommand | |
case class Withdraw(amount: Double) extends AccountCommand | |
case object CloseAccount extends AccountCommand | |
sealed trait AccountEvent | |
case object AccountCreated extends AccountEvent | |
case class Deposited(amount: Double) extends AccountEvent | |
case class Withdrawn(amount: Double) extends AccountEvent | |
case object AccountClosed extends AccountEvent | |
sealed trait Account | |
case class OpenedAccount(balance: Double) extends Account | |
case object ClosedAccount extends Account | |
private val initialHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = | |
CommandHandler.command { | |
case CreateAccount ⇒ Effect.persist(AccountCreated) | |
case _ ⇒ Effect.unhandled | |
} | |
private val openedAccountHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = { | |
case (ctx, Some(acc: OpenedAccount), cmd) ⇒ cmd match { | |
case Deposit(amount) ⇒ Effect.persist(Deposited(amount)) | |
case Withdraw(amount) ⇒ | |
if ((acc.balance - amount) < 0.0) | |
Effect.unhandled // TODO replies are missing in this example | |
else { | |
Effect | |
.persist(Withdrawn(amount)) | |
.andThen { | |
case Some(OpenedAccount(balance)) ⇒ | |
// do some side-effect using balance | |
println(balance) | |
} | |
} | |
case CloseAccount if acc.balance == 0.0 ⇒ | |
Effect.persist(AccountClosed) | |
case CloseAccount ⇒ | |
Effect.unhandled | |
} | |
} | |
private val closedHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = | |
CommandHandler.command(_ ⇒ Effect.unhandled) | |
private def commandHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = | |
CommandHandler.byState { | |
case None ⇒ initialHandler | |
case Some(OpenedAccount(_)) ⇒ openedAccountHandler | |
case Some(ClosedAccount) ⇒ closedHandler | |
} | |
private val eventHandler: (Option[Account], AccountEvent) ⇒ Option[Account] = { | |
case (None, AccountCreated) ⇒ Some(OpenedAccount(0.0)) | |
case (Some(acc @ OpenedAccount(_)), Deposited(amount)) ⇒ | |
Some(acc.copy(balance = acc.balance + amount)) | |
case (Some(acc @ OpenedAccount(_)), Withdrawn(amount)) ⇒ | |
Some(acc.copy(balance = acc.balance - amount)) | |
case (Some(OpenedAccount(_)), AccountClosed) ⇒ | |
Some(ClosedAccount) | |
case (state, event) ⇒ throw new RuntimeException(s"unexpected event [$event] in state [$state]") | |
} | |
def behavior(accountNumber: String): Behavior[AccountCommand] = | |
PersistentBehaviors.receive[AccountCommand, AccountEvent, Option[Account]]( | |
persistenceId = accountNumber, | |
initialState = None, | |
commandHandler = commandHandler, | |
eventHandler = eventHandler | |
) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object AccountExample2 { | |
sealed trait AccountCommand | |
case object CreateAccount extends AccountCommand | |
case class Deposit(amount: Double) extends AccountCommand | |
case class Withdraw(amount: Double) extends AccountCommand | |
case object CloseAccount extends AccountCommand | |
sealed trait AccountEvent | |
case object AccountCreated extends AccountEvent | |
case class Deposited(amount: Double) extends AccountEvent | |
case class Withdrawn(amount: Double) extends AccountEvent | |
case object AccountClosed extends AccountEvent | |
sealed trait Account | |
case object EmptyAccount extends Account | |
case class OpenedAccount(balance: Double) extends Account | |
case object ClosedAccount extends Account | |
private val initialHandler: CommandHandler[AccountCommand, AccountEvent, Account] = | |
CommandHandler.command { | |
case CreateAccount ⇒ Effect.persist(AccountCreated) | |
case _ ⇒ Effect.unhandled | |
} | |
private val openedAccountHandler: CommandHandler[AccountCommand, AccountEvent, Account] = { | |
case (ctx, acc: OpenedAccount, cmd) ⇒ cmd match { | |
case Deposit(amount) ⇒ Effect.persist(Deposited(amount)) | |
case Withdraw(amount) ⇒ | |
if ((acc.balance - amount) < 0.0) | |
Effect.unhandled // TODO replies are missing in this example | |
else { | |
Effect | |
.persist(Withdrawn(amount)) | |
.andThen { | |
case OpenedAccount(balance) ⇒ | |
// do some side-effect using balance | |
println(balance) | |
} | |
} | |
case CloseAccount if acc.balance == 0.0 ⇒ | |
Effect.persist(AccountClosed) | |
case CloseAccount ⇒ | |
Effect.unhandled | |
} | |
} | |
private val closedHandler: CommandHandler[AccountCommand, AccountEvent, Account] = | |
CommandHandler.command(_ ⇒ Effect.unhandled) | |
private def commandHandler: CommandHandler[AccountCommand, AccountEvent, Account] = | |
CommandHandler.byState { | |
case EmptyAccount ⇒ initialHandler | |
case OpenedAccount(_) ⇒ openedAccountHandler | |
case ClosedAccount ⇒ closedHandler | |
} | |
private val eventHandler: (Account, AccountEvent) ⇒ Account = { | |
case (EmptyAccount, AccountCreated) ⇒ OpenedAccount(0.0) | |
case (acc @ OpenedAccount(_), Deposited(amount)) ⇒ | |
acc.copy(balance = acc.balance + amount) | |
case (acc @ OpenedAccount(_), Withdrawn(amount)) ⇒ | |
acc.copy(balance = acc.balance - amount) | |
case (OpenedAccount(_), AccountClosed) ⇒ | |
ClosedAccount | |
case (state, event) ⇒ throw new RuntimeException(s"unexpected event [$event] in state [$state]") | |
} | |
def behavior(accountNumber: String): Behavior[AccountCommand] = | |
PersistentBehaviors.receive[AccountCommand, AccountEvent, Account]( | |
persistenceId = accountNumber, | |
initialState = EmptyAccount, | |
commandHandler = commandHandler, | |
eventHandler = eventHandler | |
) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.docs.akka.persistence.typed | |
import akka.Done | |
import akka.actor.typed.scaladsl.ActorContext | |
import akka.actor.typed.ActorRef | |
import akka.actor.typed.Behavior | |
import akka.actor.typed.scaladsl.Behaviors | |
import akka.persistence.typed.scaladsl.Effect | |
import akka.persistence.typed.scaladsl.PersistentBehaviors | |
import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler | |
import akka.persistence.typed.internal.SideEffect | |
object InDepthPersistentBehaviorSpec3 { | |
abstract class OptionalInitialState[Command, Event, State] { | |
protected def initialCommandHandler: (ActorContext[Command], Command) ⇒ Effect[Event, State] | |
protected def activeCommandHandler: CommandHandler[Command, Event, State] | |
protected def initialEventHandler: Event ⇒ State | |
protected def activeEventHandler: (State, Event) ⇒ State | |
final val initialState: Option[State] = None | |
final def commandHandler: CommandHandler[Command, Event, Option[State]] = { | |
// capture the functions once, in case they are defined as defs | |
val initial = initialCommandHandler | |
val active = activeCommandHandler | |
CommandHandler.byState { | |
case Some(_) ⇒ convertEffect(active) | |
case None ⇒ convertInitialEffect(initial) | |
} | |
} | |
private def convertEffect(handler: CommandHandler[Command, Event, State]): CommandHandler[Command, Event, Option[State]] = { | |
(ctx, optState, cmd) ⇒ | |
optState match { | |
case Some(state) ⇒ | |
val effect = handler(ctx, state, cmd) | |
convertEffect(effect) | |
case None ⇒ | |
throw new IllegalStateException("Undefined state") | |
} | |
} | |
private def convertInitialEffect(handler: (ActorContext[Command], Command) ⇒ Effect[Event, State]): CommandHandler[Command, Event, Option[State]] = { | |
(ctx, _, cmd) ⇒ | |
val effect = handler(ctx, cmd) | |
convertEffect(effect) | |
} | |
private def convertEffect(effect: Effect[Event, State]): Effect[Event, Option[State]] = { | |
// FIXME traverse all chained effects and build up new that's replacing the SideEffect | |
effect match { | |
case SideEffect(callback) ⇒ SideEffect[Event, Option[State]] { | |
case Some(state) ⇒ callback(state) | |
case None ⇒ println(s"# ignoring side effect from initialCommandHandler when no state") // FIXME | |
} | |
} | |
} | |
final def eventHandler(state: Option[State], event: Event): Option[State] = | |
state match { | |
case None ⇒ Some(initialEventHandler(event)) | |
case Some(s) ⇒ Some(activeEventHandler(s, event)) | |
} | |
} | |
//#event | |
sealed trait BlogEvent extends Serializable | |
final case class PostAdded( | |
postId: String, | |
content: PostContent) extends BlogEvent | |
final case class BodyChanged( | |
postId: String, | |
newBody: String) extends BlogEvent | |
final case class Published(postId: String) extends BlogEvent | |
//#event | |
//#state | |
object BlogState { | |
val empty: Option[BlogState] = None | |
} | |
final case class BlogState(content: PostContent, published: Boolean) { | |
def withContent(newContent: PostContent): BlogState = | |
copy(content = newContent) | |
def postId: String = content.postId | |
} | |
//#state | |
//#commands | |
sealed trait BlogCommand extends Serializable | |
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand | |
final case class AddPostDone(postId: String) | |
final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand | |
final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand | |
final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand | |
final case object PassivatePost extends BlogCommand | |
final case class PostContent(postId: String, title: String, body: String) | |
//#commands | |
class BlogPostEntity(ctx: ActorContext[BlogCommand]) extends OptionalInitialState[BlogCommand, BlogEvent, BlogState] { | |
override protected def initialCommandHandler = { (ctx, cmd) ⇒ | |
cmd match { | |
case AddPost(content, replyTo) ⇒ | |
val evt = PostAdded(content.postId, content) | |
Effect.persist(evt).andThen { state2 ⇒ | |
// After persist is done additional side effects can be performed | |
replyTo ! AddPostDone(content.postId) | |
} | |
case PassivatePost ⇒ | |
Effect.stop | |
case _ ⇒ | |
Effect.unhandled | |
} | |
} | |
override protected def activeCommandHandler = { (ctx, state, cmd) ⇒ | |
cmd match { | |
case ChangeBody(newBody, replyTo) ⇒ | |
val evt = BodyChanged(state.postId, newBody) | |
Effect.persist(evt).andThen { _ ⇒ | |
replyTo ! Done | |
} | |
case Publish(replyTo) ⇒ | |
Effect.persist(Published(state.postId)).andThen { _ ⇒ | |
println(s"Blog post ${state.postId} was published") | |
replyTo ! Done | |
} | |
case GetPost(replyTo) ⇒ | |
replyTo ! state.content | |
Effect.none | |
case _: AddPost ⇒ | |
Effect.unhandled | |
case PassivatePost ⇒ | |
Effect.stop | |
} | |
} | |
override protected def initialEventHandler = { | |
case PostAdded(postId, content) ⇒ | |
BlogState(content, published = false) | |
} | |
override protected def activeEventHandler = { (state, event) ⇒ | |
event match { | |
case BodyChanged(_, newBody) ⇒ | |
state.withContent(state.content.copy(body = newBody)) | |
case Published(_) ⇒ | |
state.copy(published = true) | |
} | |
} | |
} | |
//#behavior | |
def behavior(entityId: String): Behavior[BlogCommand] = { | |
Behaviors.setup { ctx ⇒ | |
val entity = new BlogPostEntity(ctx) | |
PersistentBehaviors.receive[BlogCommand, BlogEvent, Option[BlogState]]( | |
persistenceId = "Blog-" + entityId, | |
initialState = entity.initialState, | |
commandHandler = entity.commandHandler, | |
eventHandler = entity.eventHandler) | |
} | |
} | |
//#behavior | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment