Skip to content

Instantly share code, notes, and snippets.

@notxcain
Last active February 10, 2016 09:57
Show Gist options
  • Save notxcain/ca4dc5455c0e6e50da6e to your computer and use it in GitHub Desktop.
Save notxcain/ca4dc5455c0e6e50da6e to your computer and use it in GitHub Desktop.
Simple akka stream based eventsourcing
sealed trait Ack[+R]
object Ack {
def accepted[R]: Ack[R] = Accepted
def rejected[R](rejection: R): Ack[R] = Rejected(rejection)
case object Accepted extends Ack[Nothing]
case class Rejected[+R](rejection: R) extends Ack[R]
}
sealed trait Message
case class Command(id: String) extends Message
case class Event(id: String) extends Message
sealed trait Rejection
case object Rejection extends Rejection
trait State {
def applyEvent(event: Event): State
def processCommand(command: Command): Future[Rejection Xor Event]
}
object State {
def zero: State = ???
}
def persistedEvents: Source[Event, Any] = ???
def commands: Source[(Command, Promise[Ack[Rejection]]), Any] = ???
def persistEvent(e: Event): Future[Event] = ???
persistedEvents.fold(State.zero) { (state, event) =>
state.applyEvent(event)
}.flatMapConcat { state =>
commands.fold(Future.successful(state)) { case (futureState, (command, ack)) =>
futureState.flatMap { state =>
state.processCommand(command).flatMap {
case Xor.Right(event) => persistEvent(event).map(state.applyEvent).map(_ -> Ack.accepted[Rejection])
case Xor.Left(rejection) => Future.successful(state -> Ack.rejected(rejection))
}.map { case (s, r) =>
ack.success(r)
s
}
}
}.mapAsync(1)(identity)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment