Forked from guidoschmidt17/example_zpure_eventsourcing_1.scala
Created
February 24, 2023 03:17
-
-
Save guizmaii/9df958c2010b4dc593b85c3c11bdeaa2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cqrs | |
package eventstore | |
import zio.* | |
import zio.prelude.* | |
import zio.prelude.fx.* | |
object BaseSyntax: | |
type Program[S, R, E, A] = ZPure[Nothing, S, S, R, E, A] | |
type LoggedProgram[W, S, R, E, A] = ZPure[W, S, S, R, E, A] | |
def pure[S, R, A](a: A): Program[S, R, Nothing, A] = | |
ZPure.succeed(a) | |
def unit[S, R]: Program[S, R, Nothing, Unit] = | |
ZPure.unit | |
def raiseError[S, R, E](t: => E): Program[S, R, E, Nothing] = | |
ZPure.fail(t) | |
def assertThat[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] = | |
if cond then unit else raiseError(t) | |
def assertThatNot[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] = | |
assertThat(!cond, t) | |
def extractOption[S, R, E, A](a: Option[A], t: => E): Program[S, R, E, A] = | |
a match | |
case Some(value) => pure(value) | |
case None => raiseError(t) | |
def get[S, R]: Program[S, R, Nothing, S] = | |
ZPure.get[S] | |
def set[S, R](s: S): Program[S, R, Nothing, Unit] = | |
EState.set(s) | |
def setWithError[S, R, E](s: S): Program[S, R, E, Unit] = | |
EState.set(s) | |
def update[S, R](f: S => S): Program[S, R, Nothing, Unit] = | |
EState.update(f.apply) | |
def inspect[S, R, A](f: S => A): Program[S, R, Nothing, A] = | |
get.map(f(_)) | |
def inquire[S, R: Tag, A](f: R => A): Program[S, R, Nothing, A] = | |
ZPure.service[S, R].map(f(_)) | |
def log[W, S, R, E](w: W): LoggedProgram[W, S, R, E, Unit] = | |
ZPure.log(w) | |
def set[W, S, R](previouslog: Chunk[W]): LoggedProgram[W, S, R, Nothing, Unit] = | |
ZPure.forEach(previouslog)(log(_)).unit | |
def restore[W, S, R](previouslog: Chunk[W], previousstate: S): LoggedProgram[W, S, R, Nothing, Unit] = | |
set(previouslog) *> set(previousstate) | |
package cqrs | |
package domain | |
package mastermanagement | |
import zio.* | |
import eventstore.* | |
import Error.* | |
import Error.ValidationError.* | |
import Event.* | |
import Fact.* | |
import Transport.* | |
final class Syntax private (transport: Transport): | |
type Program[A] = BaseSyntax.Program[Instance, Any, Error, A] | |
type LoggedProgram[A] = BaseSyntax.LoggedProgram[Value, Instance, Any, Error, A] | |
val get = BaseSyntax.get[Instance, Any] | |
val set = BaseSyntax.set[Instance, Any] | |
val update = BaseSyntax.update[Instance, Any] | |
def setWithError = BaseSyntax.setWithError[Instance, Any, Error] | |
def inspect[A] = BaseSyntax.inspect[Instance, Any, A] | |
import BaseSyntax.* | |
def lift(event: Event)(using transition: Transition): LoggedProgram[Unit] = | |
for | |
previous <- inspect(_.master.latest) | |
_ <- transition(event, previous + 1) | |
m <- inspect(_.master) | |
_ <- log(Value(m, event)) | |
yield () | |
private def snapshot(raws: RawValues, instance: Instance): ZIO[Any, Any, (Values, Instance)] = | |
val withsnapshot = for | |
_ <- set(instance) | |
weight = raws.foldLeft(0)((s, r) => | |
if r.eventCategory == Category.Created && r.aggregateLatest > 1 then s else s + r.eventData.length + MinSnapshotEventSize | |
) | |
_ = println(s"weight $weight $DefaultSnapshotThreshold") | |
_ <- | |
if weight > DefaultSnapshotThreshold then | |
for | |
m <- inspect(_.master) | |
currentindex <- inspect(_.index) | |
_ <- lift(SnapshotTaken(m.snapshot(currentindex))) | |
yield () | |
else unit | |
yield () | |
withsnapshot.runAll(instance) match | |
case (snapshot, Right(instance, _)) => ZIO.succeed(snapshot, instance) | |
case (_, Left(cause)) => ZIO.fail(cause) | |
extension (program: LoggedProgram[Unit]) | |
def runFactsEither: (Values, Either[zio.Cause[Error], Instance]) = | |
program.runAll(Instance.empty) match | |
case (facts, Right(instance, _)) => (facts, Right(instance)) | |
case (_, Left(cause)) => (Chunk.empty, Left(cause.toCause)) | |
def runFacts(tags: Tags): ZIO[Any, Any, (RawValues, Instance)] = | |
runFactsEither match | |
case (facts, Right(instance)) => | |
for | |
raws <- facts.mapZIO(transport.toRawValue(_, tags)) | |
(s, instance) <- snapshot(raws, instance) | |
withsnapshot <- if s.size == 1 then transport.toRawValue(s(0), tags).flatMap(s => ZIO.succeed(raws :+ s)) else ZIO.succeed(raws) | |
yield (withsnapshot, instance) | |
case (_, Left(cause)) => ZIO.fail(cause) | |
final given instanceTransition: Transition = new Transition | |
final class Transition: | |
def apply(event: Event, latest: Int): Program[Unit] = | |
import Entity.EntityHolder | |
for | |
previous <- inspect(_.master.latest) | |
_ <- event match | |
case SnapshotTaken(_) => unit | |
case Created(_) | _ => | |
assertThat(latest - previous == 1, EventNotInSequence(s"expected ${previous + 1}, but found ${latest}}, event ${event}")) | |
_ <- event match | |
case Nested(Nested(Nested(nested))) => | |
update(i => | |
val iter = nested match | |
case Deleted => i.iteration | |
case Frozen(index: Int) => i.revision.entities(index - 1).freeze | |
case AddedProperty(property) => i.iteration.add(property) | |
case RemovedProperty(property) => i.iteration.remove(property) | |
case NewEntity(ie: EntityHolder) => | |
i.iteration.copy(index = ie.index, workable = ie.workable) | |
val (r, iternew) = nested match | |
case NewEntity(_) => (i.revision.add(iter), iter) | |
case Deleted => | |
val r = i.revision.remove(iter) | |
(r, r.entities.last.unfreeze) | |
case _ => (i.revision.update(iter), iter) | |
val v = i.version.update(r) | |
val m = i.master.update(v) | |
i.copy(m, v, r, iternew) | |
) | |
case Nested(Nested(nested)) => | |
update(i => | |
val r = nested match | |
case Deleted => i.revision | |
case Frozen(index: Int) => i.version.entities(index - 1).freeze | |
case AddedProperty(property) => i.revision.add(property) | |
case RemovedProperty(property) => i.revision.remove(property) | |
case NewEntity(re: EntityHolder) => | |
val ie = re.entitystack.last | |
val iter = i.iteration.copy(index = ie.index, workable = ie.workable) | |
i.revision.copy(index = re.index, workable = re.workable, entities = Vector(iter)) | |
val (v, rnew) = nested match | |
case NewEntity(_) => (i.version.add(r), r) | |
case Deleted => | |
val v = i.version.remove(r) | |
(v, v.entities.last.unfreeze) | |
case _ => (i.version.update(r), r) | |
val m = i.master.update(v) | |
nested match | |
case NewEntity(_) => | |
val iter = r.entities.last | |
i.copy(m, v, r, iter) | |
case _ => i.copy(m, v, rnew) | |
) | |
case Nested(nested) => | |
update(i => | |
val v = nested match | |
case Deleted => i.version | |
case Frozen(index: Int) => i.master.entities(index - 1).freeze | |
case AddedProperty(property) => i.version.add(property) | |
case RemovedProperty(property) => i.version.remove(property) | |
case NewEntity(ve: EntityHolder) => | |
val ie = ve.entitystack.head | |
val re = ve.entitystack.last | |
val iter = i.iteration.copy(index = ie.index, workable = ie.workable) | |
val r = | |
i.revision.copy(index = re.index, workable = re.workable, entities = Vector(iter)) | |
i.version.copy(index = ve.index, workable = ve.workable, entities = Vector(r)) | |
val (m, vnew) = nested match | |
case NewEntity(_) => (i.master.add(v), v) | |
case Deleted => | |
val m = i.master.remove(v) | |
(m, m.entities.last.unfreeze) | |
case _ => (i.master.update(v), v) | |
nested match | |
case NewEntity(_) => | |
val r = v.entities.last | |
val iter = r.entities.last | |
i.copy(m, v, r, iter) | |
case _ => i.copy(m, vnew) | |
) | |
case Created(m: Master) => | |
update(i => | |
val (iv, ir, iiter) = m.currentindex | |
val v = m.entities(iv - 1) | |
val r = v.entities(ir - 1) | |
val iter = r.entities(iiter - 1) | |
i.copy(m, v, r, iter) | |
) | |
case SnapshotTaken(m: Master) => | |
update(i => | |
val (iv, ir, iiter) = m.currentindex | |
val v = m.entities(iv - 1) | |
val r = v.entities(ir - 1) | |
val iter = r.entities(iiter - 1) | |
i.copy(m, v, r, iter) | |
) | |
case Unfrozen((iv, ir, iiter)) => | |
update(i => | |
val m = i.master.unfreeze | |
val v = m.entities(iv - 1).unfreeze | |
val r = v.entities(ir - 1).unfreeze | |
val iter = r.entities(iiter - 1).unfreeze | |
i.copy(m, v, r, iter) | |
) | |
case Deleted => update(i => i.copy(i.master.delete, Version.empty, Revision.empty, Iteration.empty)) | |
case AddedProperty(property) => update(i => i.copy(i.master.add(property))) | |
case RemovedProperty(property) => update(i => i.copy(i.master.remove(property))) | |
case _ => raiseError(UnhandledEvent(event.toString)) | |
yield () | |
def deleteMaster = change(_.master, Deleted) | |
def deleteVersion = delete(_.master) | |
def deleteRevision = delete(_.version) | |
def deleteIteration = delete(_.revision) | |
def addPropertyMaster(property: Property) = change(_.master, AddedProperty(property)) | |
def addPropertyVersion(property: Property) = change(_.version, AddedProperty(property)) | |
def addPropertyRevision(property: Property) = change(_.revision, AddedProperty(property)) | |
def addPropertyIteration(property: Property) = change(_.iteration, AddedProperty(property)) | |
def removePropertyMaster(property: String) = change(_.master, RemovedProperty(property)) | |
def removePropertyVersion(property: String) = change(_.version, RemovedProperty(property)) | |
def removePropertyRevision(property: String) = change(_.revision, RemovedProperty(property)) | |
def removePropertyIteration(property: String) = change(_.iteration, RemovedProperty(property)) | |
def createMaster(uuid: Uuid) = | |
val iter = Iteration(Properties.empty, true, 1) | |
val r = Revision(Properties.empty, true, 1, Vector(iter)) | |
val v = Version(Properties.empty, true, 1, Vector(r)) | |
val m = Master(uuid, 1, Properties.empty, false, true, Vector(v), (1, 1, 1)) | |
lift(Created(m)) | |
def newVersion = | |
for | |
i <- get | |
iter = Entity(index = 1) | |
r = Entity(index = 1) | |
v = Entity(index = i.master.nextIndex, Vector(iter, r)) | |
_ <- change(_.iteration, Frozen(i.iteration)) | |
_ <- change(_.revision, Frozen(i.revision)) | |
_ <- change(_.version, Frozen(i.version)) | |
_ <- lift(Nested(NewEntity(v))) | |
yield () | |
def newRevision = | |
for | |
i <- get | |
iter = Entity(index = 1) | |
r = Entity(index = i.version.nextIndex, Vector(iter)) | |
_ <- change(_.iteration, Frozen(i.iteration)) | |
_ <- change(_.revision, Frozen(i.revision)) | |
_ <- lift(Nested(Nested(NewEntity(r)))) | |
yield () | |
def newIteration = | |
for | |
i <- get | |
iter = Entity(index = i.revision.nextIndex) | |
_ <- change(_.iteration, Frozen(i.iteration)) | |
_ <- lift(Nested(Nested(Nested(NewEntity(iter))))) | |
yield () | |
def selectIndex(index: (Int, Int, Int)) = | |
def check[A <: Entity[A]](i: Int, entities: Vector[A], m: Master): Program[A] = | |
for _ <- assertThat( | |
0 < i && i <= entities.size, | |
InvalidInput(s"index out of range $index, $i not in [1, ${entities.size}], aggregateroot: $m") | |
) | |
yield entities(i - 1) | |
val (iv, ir, ii) = index | |
for | |
m <- inspect(_.master) | |
v <- check(iv, m.entities, m) | |
r <- check(ir, v.entities, m) | |
iter <- check(ii, r.entities, m) | |
_ <- lift(Unfrozen(index)) | |
yield () | |
def selectLatest = | |
for | |
m <- inspect(_.master) | |
v = m.entities.last | |
r = v.entities.last | |
iter = r.entities.last | |
_ <- selectIndex((v.index, r.index, iter.index)) | |
yield () | |
private def delete(f: Instance => Aggregate[?, ?]) = | |
for | |
a <- inspect(f) | |
_ <- assertThat(a.workable, ValidationError.NotWorkable(s"Cannot delete entity in $a")) | |
i <- get | |
_ <- assertThat(a.entities.size > 1, ValidationError.InvalidState(s"Cannot delete last entity ${i.index} for $a")) | |
_ <- lift(a match | |
case _: Revision => Nested(Nested(Nested(Deleted))) | |
case _: Version => Nested(Nested(Deleted)) | |
case _: Master => Nested(Deleted) | |
) | |
yield () | |
private def change(f: Instance => Workable[?], event: Event) = | |
for | |
w <- inspect(f) | |
_ <- assertThat(w.workable, ValidationError.NotWorkable(s"$w event $event")) | |
_ <- lift(w match | |
case _: Iteration => Nested(Nested(Nested(event))) | |
case _: Revision => Nested(Nested(event)) | |
case _: Version => Nested(event) | |
case _: Master => event | |
) | |
yield () | |
end Syntax | |
object Syntax: | |
val layer = ZLayer.fromZIO(makeLayer) | |
private def makeLayer = for | |
transport <- ZIO.service[Transport] | |
result = Syntax(transport) | |
yield result | |
// example usage of domain "mastermanagement" | |
case class Person(lastname: String, firstname: String, age: Int) | |
given Codec[Person] = deriveCodec[Person] | |
case class Matrix( | |
m1: Double, | |
m2: Double, | |
m3: Double, | |
m4: Double, | |
m5: Double, | |
m6: Double, | |
m7: Double, | |
m8: Double, | |
m9: Double, | |
m10: Double, | |
m11: Double, | |
m12: Double | |
) | |
given Codec[Matrix] = deriveCodec[Matrix] | |
object Matrix: | |
final val identity = Matrix(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0) | |
import Properties.* | |
import Properties.given | |
def test(uuid: Uuid): LoggedProgram[Unit] = | |
val largebytes = util.Random.alphanumeric.take(200).toList.toString.nn | |
val modify = | |
for | |
_ <- addPropertyMaster("name" -> StringElem("this is my name.")) | |
_ <- addPropertyMaster("description" -> StringElem("this is my description.")) | |
_ <- addPropertyVersion("creator" -> StringElem("Bob")) | |
_ <- addPropertyRevision("size" -> IntElem(4711)) | |
_ <- addPropertyIteration("pi" -> DoubleElem(3.14159)) | |
_ <- addPropertyIteration("somebytes" -> ByteArrayElem("blablabla".getBytes.nn)) | |
_ <- addPropertyIteration("largebytes" -> StringElem(largebytes)) | |
_ <- addPropertyIteration("happy" -> BooleanElem(true)) | |
_ <- addPropertyIteration("noidea" -> NullElem) | |
_ <- addPropertyIteration("dontcare" -> UndefinedElem) | |
_ <- addPropertyIteration("file" -> AnyElem(File(Uuid.randomUuid, 100, 10, "test.txt"))) | |
_ <- addPropertyIteration("person" -> AnyElem(Person("Smith", "Joe", 37))) | |
_ <- addPropertyIteration("matrix" -> AnyElem(Matrix.identity)) | |
_ <- newIteration | |
_ <- newVersion | |
_ <- newRevision | |
_ <- newIteration | |
_ <- newVersion | |
_ <- newIteration | |
_ <- removePropertyIteration("pi") | |
_ <- addPropertyIteration("e" -> DoubleElem(2.71828)) | |
_ <- removePropertyMaster("name") | |
_ <- deleteIteration | |
_ <- selectIndex((2, 1, 1)) | |
_ <- removePropertyIteration("name") | |
_ <- removePropertyIteration("name") | |
_ <- newIteration | |
_ <- addPropertyIteration("name" -> StringElem("this is my new name.")) | |
_ <- newVersion | |
_ <- selectIndex((1, 1, 2)) | |
_ <- selectLatest | |
yield () | |
for | |
_ <- createMaster(uuid) | |
_ <- modify.repeatN(1999) | |
yield () | |
// using Transition to recreate the AggregateRoot | |
import syntax.* | |
def readById(uuid: Uuid) = | |
ZIO.scoped(eventstore.readFactsByAggregateRootId(uuid).flatMap(aggregate(_))) | |
def aggregate( | |
facts: RawValueStream | |
)(using transition: Transition): ZIO[Any, StreamingError, ZPure[Nothing, Instance, Instance, Any, Error, Unit]] = | |
facts | |
.mapZIO(transport.fromRawValue(_)) | |
.runFold(setWithError(Instance.empty))((s, value) => s.flatMap(_ => transition(value.event, value.identified.latest))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment