Last active
March 21, 2023 16:29
-
-
Save guidoschmidt17/b3fa1ca797d37c2af5a40a85c0bdbeba to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zio.* | |
import zio.prelude.* | |
import zio.prelude.fx.* | |
object BaseSyntax: | |
type Program[S, R, E, A] = ZPure[Nothing, S, S, R, E, A] | |
type LoggedProgram[W, S, R, E, A] = ZPure[W, S, S, R, E, A] | |
def pure[S, R, A](a: A): Program[S, R, Nothing, A] = | |
ZPure.succeed(a) | |
def unit[S, R]: Program[S, R, Nothing, Unit] = | |
ZPure.unit | |
def raiseError[S, R, E](t: => E): Program[S, R, E, Nothing] = | |
ZPure.fail(t) | |
def assertThat[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] = | |
if cond then unit else raiseError(t) | |
def assertThatNot[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] = | |
assertThat(!cond, t) | |
def extractOption[S, R, E, A](a: Option[A], t: => E): Program[S, R, E, A] = | |
a match | |
case Some(value) => pure(value) | |
case None => raiseError(t) | |
def get[S, R]: Program[S, R, Nothing, S] = | |
ZPure.get[S] | |
def set[S, R](s: S): Program[S, R, Nothing, Unit] = | |
EState.set(s) | |
def setWithError[S, R, E](s: S): Program[S, R, E, Unit] = | |
EState.set(s) | |
def update[S, R](f: S => S): Program[S, R, Nothing, Unit] = | |
EState.update(f.apply) | |
def inspect[S, R, A](f: S => A): Program[S, R, Nothing, A] = | |
get.map(f(_)) | |
def inquire[S, R: Tag, A](f: R => A): Program[S, R, Nothing, A] = | |
ZPure.service[S, R].map(f(_)) | |
def log[W, S, R, E](w: W): LoggedProgram[W, S, R, E, Unit] = | |
ZPure.log(w) | |
def set[W, S, R](previouslog: Chunk[W]): LoggedProgram[W, S, R, Nothing, Unit] = | |
ZPure.forEach(previouslog)(log(_)).unit | |
def restore[W, S, R](previouslog: Chunk[W], previousstate: S): LoggedProgram[W, S, R, Nothing, Unit] = | |
set(previouslog) *> set(previousstate) | |
package cqrs | |
package domain | |
package mastermanagement | |
import zio.* | |
import eventstore.* | |
import Error.* | |
import Error.ValidationError.* | |
import Event.* | |
import Fact.* | |
import Transport.* | |
final class Syntax private (transport: Transport): | |
type Program[A] = BaseSyntax.Program[Instance, Any, Error, A] | |
type LoggedProgram[A] = BaseSyntax.LoggedProgram[Value, Instance, Any, Error, A] | |
val get = BaseSyntax.get[Instance, Any] | |
val set = BaseSyntax.set[Instance, Any] | |
val update = BaseSyntax.update[Instance, Any] | |
def setWithError = BaseSyntax.setWithError[Instance, Any, Error] | |
def inspect[A] = BaseSyntax.inspect[Instance, Any, A] | |
import BaseSyntax.* | |
def lift(event: Event)(using transition: Transition): LoggedProgram[Unit] = | |
for | |
previous <- inspect(_.master.latest) | |
_ <- transition(event, previous + 1) | |
m <- inspect(_.master) | |
_ <- log(Value(m, event)) | |
yield () | |
private def snapshot(raws: RawValues, instance: Instance): ZIO[Any, Any, (Values, Instance)] = | |
val withsnapshot = for | |
_ <- set(instance) | |
weight = raws.foldLeft(0)((s, r) => | |
if r.eventCategory == Category.Created && r.aggregateLatest > 1 then s else s + r.eventData.length + MinSnapshotEventSize | |
) | |
_ = println(s"weight $weight $DefaultSnapshotThreshold") | |
_ <- | |
if weight > DefaultSnapshotThreshold then | |
for | |
m <- inspect(_.master) | |
currentindex <- inspect(_.index) | |
_ <- lift(SnapshotTaken(m.snapshot(currentindex))) | |
yield () | |
else unit | |
yield () | |
withsnapshot.runAll(instance) match | |
case (snapshot, Right(instance, _)) => ZIO.succeed(snapshot, instance) | |
case (_, Left(cause)) => ZIO.fail(cause) | |
extension (program: LoggedProgram[Unit]) | |
def runFactsEither: (Values, Either[zio.Cause[Error], Instance]) = | |
program.runAll(Instance.empty) match | |
case (facts, Right(instance, _)) => (facts, Right(instance)) | |
case (_, Left(cause)) => (Chunk.empty, Left(cause.toCause)) | |
def runFacts(tags: Tags): ZIO[Any, Any, (RawValues, Instance)] = | |
runFactsEither match | |
case (facts, Right(instance)) => | |
for | |
raws <- facts.mapZIO(transport.toRawValue(_, tags)) | |
(s, instance) <- snapshot(raws, instance) | |
withsnapshot <- if s.size == 1 then transport.toRawValue(s(0), tags).flatMap(s => ZIO.succeed(raws :+ s)) else ZIO.succeed(raws) | |
yield (withsnapshot, instance) | |
case (_, Left(cause)) => ZIO.fail(cause) | |
final given instanceTransition: Transition = new Transition | |
final class Transition: | |
def apply(event: Event, latest: Int): Program[Unit] = | |
import Entity.EntityHolder | |
for | |
previous <- inspect(_.master.latest) | |
_ <- event match | |
case SnapshotTaken(_) => unit | |
case Created(_) | _ => | |
assertThat(latest - previous == 1, EventNotInSequence(s"expected ${previous + 1}, but found ${latest}}, event ${event}")) | |
_ <- event match | |
case Nested(Nested(Nested(nested))) => | |
... | |
yield () | |
... | |
end Syntax | |
object Syntax: | |
val layer = ZLayer.fromZIO(makeLayer) | |
private def makeLayer = for | |
transport <- ZIO.service[Transport] | |
result = Syntax(transport) | |
yield result | |
// example usage of domain "mastermanagement" | |
case class Person(lastname: String, firstname: String, age: Int) | |
given Codec[Person] = deriveCodec[Person] | |
case class Matrix( | |
m1: Double, | |
m2: Double, | |
m3: Double, | |
m4: Double, | |
m5: Double, | |
m6: Double, | |
m7: Double, | |
m8: Double, | |
m9: Double, | |
m10: Double, | |
m11: Double, | |
m12: Double | |
) | |
given Codec[Matrix] = deriveCodec[Matrix] | |
object Matrix: | |
final val identity = Matrix(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0) | |
import Properties.* | |
import Properties.given | |
def test(uuid: Uuid): LoggedProgram[Unit] = | |
val largebytes = util.Random.alphanumeric.take(200).toList.toString.nn | |
val modify = | |
for | |
_ <- addPropertyMaster("name" -> StringElem("this is my name.")) | |
_ <- addPropertyMaster("description" -> StringElem("this is my description.")) | |
_ <- addPropertyVersion("creator" -> StringElem("Bob")) | |
_ <- addPropertyRevision("size" -> IntElem(4711)) | |
_ <- addPropertyIteration("pi" -> DoubleElem(3.14159)) | |
_ <- addPropertyIteration("somebytes" -> ByteArrayElem("blablabla".getBytes.nn)) | |
_ <- addPropertyIteration("largebytes" -> StringElem(largebytes)) | |
_ <- addPropertyIteration("happy" -> BooleanElem(true)) | |
_ <- addPropertyIteration("noidea" -> NullElem) | |
_ <- addPropertyIteration("dontcare" -> UndefinedElem) | |
_ <- addPropertyIteration("file" -> AnyElem(File(Uuid.randomUuid, 100, 10, "test.txt"))) | |
_ <- addPropertyIteration("person" -> AnyElem(Person("Smith", "Joe", 37))) | |
_ <- addPropertyIteration("matrix" -> AnyElem(Matrix.identity)) | |
_ <- newIteration | |
_ <- newVersion | |
_ <- newRevision | |
_ <- newIteration | |
_ <- newVersion | |
_ <- newIteration | |
_ <- removePropertyIteration("pi") | |
_ <- addPropertyIteration("e" -> DoubleElem(2.71828)) | |
_ <- removePropertyMaster("name") | |
_ <- deleteIteration | |
_ <- selectIndex((2, 1, 1)) | |
_ <- removePropertyIteration("name") | |
_ <- removePropertyIteration("name") | |
_ <- newIteration | |
_ <- addPropertyIteration("name" -> StringElem("this is my new name.")) | |
_ <- newVersion | |
_ <- selectIndex((1, 1, 2)) | |
_ <- selectLatest | |
yield () | |
for | |
_ <- createMaster(uuid) | |
_ <- modify.repeatN(1999) | |
yield () | |
// using Transition to recreate the AggregateRoot | |
import syntax.* | |
def readById(uuid: Uuid) = | |
ZIO.scoped(eventstore.readFactsByAggregateRootId(uuid).flatMap(aggregate(_))) | |
def aggregate( | |
facts: RawValueStream | |
)(using transition: Transition): ZIO[Any, StreamingError, ZPure[Nothing, Instance, Instance, Any, Error, Unit]] = | |
facts | |
.mapZIO(transport.fromRawValue(_)) | |
.runFold(setWithError(Instance.empty))((s, value) => s.flatMap(_ => transition(value.event, value.identified.latest))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment