Last active
February 15, 2022 18:16
-
-
Save tPl0ch/5c6c9a046c1e5572f7601a63a0259eef to your computer and use it in GitHub Desktop.
Message-Driven Finite-State-Transducer Domain-Driven-Design Aggregate
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.instances.either._ | |
import Transducer.run | |
import UserRegistration._ | |
object Main extends App { | |
private val commands = List(GDPRDeletion, StartRegistration, StartRegistration, ConfirmAccount, GDPRDeletion) | |
run(userRegistration)(commands).foreach(println) | |
// OUTPUT | |
// Left(NonEmptyList(Transducer$UnsupportedStateTransition: There is no action defined for state Right(PotentialCustomer) with input GDPRDeletion, UserRegistration$InvalidStateForCommand: Cannot process command GDPRDeletion for invalid state)) | |
// Right(ConfirmationSent) | |
// Left(NonEmptyList(Transducer$UnsupportedStateTransition: There is no action defined for state Right(WaitingForConfirmation) with input StartRegistration, UserRegistration$InvalidStateForCommand: Cannot process command StartRegistration for invalid state)) | |
// Right(AccountConfirmed) | |
// Right(AccountDeleted) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.ApplicativeError | |
import cats.data.NonEmptyList | |
import cats.implicits._ | |
import scala.annotation.tailrec | |
/** This is a POC of a message-driven finite-state-transducer implementation | |
* | |
* It uses cats to enable generic error-handling applicatives for clients. | |
* | |
* @see https://en.wikipedia.org/wiki/Finite-state_machine#Transducers | |
*/ | |
object Transducer { | |
/** The (state, input) label | |
* | |
* @tparam F The error-handling abstraction type-class | |
* @tparam I A finite set of valid input messages | |
* @tparam S A finite set of states | |
*/ | |
type Label[F[_], I, S] = (F[S], I) | |
/** The output function | |
* | |
* This function uses the more advance Mealy model since it relies on both state and input to determine its output. | |
* | |
* @tparam F The error-handling abstraction type-class | |
* @tparam I A finite set of valid input messages | |
* @tparam S A finite set of states | |
* @tparam O A finite set of valid output messages | |
*/ | |
type OutputF[F[_], I, S, O] = PartialFunction[Label[F, I, S], F[O]] | |
/** The state-transition function | |
* | |
* Returns a new state for a defined label of (state, input). | |
* | |
* @tparam F The error-handling abstraction type-class | |
* @tparam I A finite set of valid input messages | |
* @tparam S A finite set of states | |
*/ | |
type ActionF[F[_], I, S] = PartialFunction[Label[F, I, S], F[S]] | |
/** The transducer tuple | |
* | |
* The tuple is of form (initial state, state-transition function, output function). | |
* | |
* The alphabets are derived from the type parameters. | |
* | |
* @tparam F The error-handling abstraction type-class | |
* @tparam I A finite set of valid input messages | |
* @tparam S A finite set of states | |
* @tparam O A finite set of valid output messages | |
*/ | |
type Transducer[F[_], I, S, O] = (F[S], ActionF[F, I, S], OutputF[F, I, S, O]) | |
/** The Error abstraction to deal with action and output errors | |
* | |
* @tparam F The error-handling abstraction type-class | |
*/ | |
type ErrorF[F[_]] = ApplicativeError[F, NonEmptyList[Throwable]] | |
/** There are two errors that are generic to all transducers. | |
* | |
* 1.) There is no action defined for a given input at a given state | |
* 2.) There is no output defined for a given new state | |
*/ | |
sealed trait TransducerError | |
/** Indicates an unsupported state transition | |
* | |
* @param label The label of (state, input) | |
* @tparam I A finite set of valid input messages | |
* @tparam S A finite set of states | |
*/ | |
final case class UnsupportedStateTransition[F[_], I, S](label: Label[F, I, S]) | |
extends Throwable(s"There is no action defined for state ${label._1} with input ${label._2}") | |
with TransducerError | |
/** Indicates that an output is not defined for a given state | |
* | |
* @param label The label of (state, input) | |
* @tparam I A finite set of valid input messages | |
* @tparam S A finite set of states | |
*/ | |
final case class UndefinedOutput[F[_], I, S](label: Label[F, I, S]) | |
extends Throwable(s"There is no output defined for state ${label._1} with input ${label._2}") | |
with TransducerError | |
private[this] def state[F[_]: ErrorF, I, S](actions: ActionF[F, I, S])(label: Label[F, I, S]): F[S] = | |
if (actions.isDefinedAt(label)) actions(label) | |
else NonEmptyList.of(UnsupportedStateTransition(label)).raiseError[F, S] | |
private[this] def output[F[_]: ErrorF, I, S, O](outputs: OutputF[F, I, S, O])(label: Label[F, I, S]): F[O] = | |
if (outputs.isDefinedAt(label)) outputs(label) | |
else NonEmptyList.of(UndefinedOutput(label)).raiseError[F, O] | |
/** Returns the sequence of states or errors for a given input tape | |
* | |
* @param transducer The transducer triple to use | |
* @param inputs A sequence of inputs from the valid input alphabet | |
* @tparam F The error-handling abstraction type-class | |
* @tparam I A finite set of valid input messages | |
* @tparam S A finite set of states | |
* @tparam O A finite set of valid output messages | |
* @return Sequence of F[S] | |
*/ | |
private def runStates[F[_]: ErrorF, I, S, O](transducer: Transducer[F, I, S, O])(inputs: Seq[I]): Seq[F[S]] = { | |
val (s, actions, _) = transducer | |
@tailrec | |
def loop(cur: F[S], in: Seq[I], acc: Seq[F[S]]): Seq[F[S]] = in match { | |
case x :: xs => | |
val newState = state(actions)(cur -> x) | |
loop(newState.recoverWith { case _ => cur }, xs, acc :+ newState) | |
case _ => acc | |
} | |
loop(s, inputs, Nil) | |
} | |
/** Returns the sequence of outputs or errors for a given input tape | |
* | |
* @param transducer The transducer triple to use | |
* @param inputs A sequence of inputs from the valid input alphabet | |
* @tparam F The error-handling abstraction type-class | |
* @tparam I A finite set of valid input messages | |
* @tparam S A finite set of states | |
* @tparam O A finite set of valid output messages | |
* @return Sequence of F[O] | |
*/ | |
def run[F[_]: ErrorF, I, S, O](transducer: Transducer[F, I, S, O])(inputs: Seq[I]): Seq[F[O]] = { | |
val (_, _, outputs) = transducer | |
runStates(transducer)(inputs) | |
.zip(inputs) | |
.map { case (s, i) => output(outputs)(s -> i) } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.data.NonEmptyList | |
import cats.syntax.either._ | |
import Transducer._ | |
/** This is a simple example of an Aggregate implementation using a transducer */ | |
object UserRegistration { | |
type ErrorF[A] = Either[NonEmptyList[Throwable], A] | |
type RegistrationAction = ActionF[ErrorF, Command, State] | |
type RegistrationOutput = OutputF[ErrorF, Command, State, Event] | |
sealed trait State | |
final case object PotentialCustomer extends State | |
final case object WaitingForConfirmation extends State | |
final case object Registered extends State | |
final case object Deleted extends State | |
sealed trait Command | |
final case object StartRegistration extends Command | |
final case object ConfirmAccount extends Command | |
final case object GDPRDeletion extends Command | |
sealed trait Event | |
final case object ConfirmationSent extends Event | |
final case object AccountConfirmed extends Event | |
final case object AccountDeleted extends Event | |
sealed trait UserRegistrationError | |
final case class InvalidStateForCommand(cmd: Command) | |
extends Throwable(s"Cannot process command $cmd for invalid state") | |
def startRegistrationAction: RegistrationAction = { | |
case (Right(PotentialCustomer), StartRegistration) => WaitingForConfirmation.asRight | |
} | |
def confirmAccountAction: RegistrationAction = { | |
case (Right(WaitingForConfirmation), ConfirmAccount) => Registered.asRight | |
} | |
def gdprDeleteAction: RegistrationAction = { | |
case (Right(Registered), GDPRDeletion) => Deleted.asRight | |
case (Right(WaitingForConfirmation), GDPRDeletion) => Deleted.asRight | |
} | |
def confirmationSentOutput: RegistrationOutput = { | |
case (Right(WaitingForConfirmation), StartRegistration) => ConfirmationSent.asRight | |
} | |
def accountConfirmedOutput: RegistrationOutput = { | |
case (Right(Registered), ConfirmAccount) => AccountConfirmed.asRight | |
} | |
def accountDeletedOutput: RegistrationOutput = { | |
case (Right(Deleted), GDPRDeletion) => AccountDeleted.asRight | |
} | |
def invalidCommandOutput: RegistrationOutput = { | |
case (Left(errorList), c: Command) => errorList.append(InvalidStateForCommand(c)).asLeft | |
} | |
lazy val userRegistration: Transducer[ErrorF, Command, State, Event] = ( | |
PotentialCustomer.asRight, | |
startRegistrationAction | |
orElse confirmAccountAction | |
orElse gdprDeleteAction, | |
confirmationSentOutput | |
orElse accountConfirmedOutput | |
orElse accountDeletedOutput | |
orElse invalidCommandOutput, | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment