Skip to content

Instantly share code, notes, and snippets.

@tPl0ch
Last active February 15, 2022 18:16
Show Gist options
  • Save tPl0ch/5c6c9a046c1e5572f7601a63a0259eef to your computer and use it in GitHub Desktop.
Save tPl0ch/5c6c9a046c1e5572f7601a63a0259eef to your computer and use it in GitHub Desktop.
Message-Driven Finite-State-Transducer Domain-Driven-Design Aggregate
import cats.instances.either._
import Transducer.run
import UserRegistration._
object Main extends App {
private val commands = List(GDPRDeletion, StartRegistration, StartRegistration, ConfirmAccount, GDPRDeletion)
run(userRegistration)(commands).foreach(println)
// OUTPUT
// Left(NonEmptyList(Transducer$UnsupportedStateTransition: There is no action defined for state Right(PotentialCustomer) with input GDPRDeletion, UserRegistration$InvalidStateForCommand: Cannot process command GDPRDeletion for invalid state))
// Right(ConfirmationSent)
// Left(NonEmptyList(Transducer$UnsupportedStateTransition: There is no action defined for state Right(WaitingForConfirmation) with input StartRegistration, UserRegistration$InvalidStateForCommand: Cannot process command StartRegistration for invalid state))
// Right(AccountConfirmed)
// Right(AccountDeleted)
}
import cats.ApplicativeError
import cats.data.NonEmptyList
import cats.implicits._
import scala.annotation.tailrec
/** This is a POC of a message-driven finite-state-transducer implementation
*
* It uses cats to enable generic error-handling applicatives for clients.
*
* @see https://en.wikipedia.org/wiki/Finite-state_machine#Transducers
*/
object Transducer {
/** The (state, input) label
*
* @tparam F The error-handling abstraction type-class
* @tparam I A finite set of valid input messages
* @tparam S A finite set of states
*/
type Label[F[_], I, S] = (F[S], I)
/** The output function
*
* This function uses the more advance Mealy model since it relies on both state and input to determine its output.
*
* @tparam F The error-handling abstraction type-class
* @tparam I A finite set of valid input messages
* @tparam S A finite set of states
* @tparam O A finite set of valid output messages
*/
type OutputF[F[_], I, S, O] = PartialFunction[Label[F, I, S], F[O]]
/** The state-transition function
*
* Returns a new state for a defined label of (state, input).
*
* @tparam F The error-handling abstraction type-class
* @tparam I A finite set of valid input messages
* @tparam S A finite set of states
*/
type ActionF[F[_], I, S] = PartialFunction[Label[F, I, S], F[S]]
/** The transducer tuple
*
* The tuple is of form (initial state, state-transition function, output function).
*
* The alphabets are derived from the type parameters.
*
* @tparam F The error-handling abstraction type-class
* @tparam I A finite set of valid input messages
* @tparam S A finite set of states
* @tparam O A finite set of valid output messages
*/
type Transducer[F[_], I, S, O] = (F[S], ActionF[F, I, S], OutputF[F, I, S, O])
/** The Error abstraction to deal with action and output errors
*
* @tparam F The error-handling abstraction type-class
*/
type ErrorF[F[_]] = ApplicativeError[F, NonEmptyList[Throwable]]
/** There are two errors that are generic to all transducers.
*
* 1.) There is no action defined for a given input at a given state
* 2.) There is no output defined for a given new state
*/
sealed trait TransducerError
/** Indicates an unsupported state transition
*
* @param label The label of (state, input)
* @tparam I A finite set of valid input messages
* @tparam S A finite set of states
*/
final case class UnsupportedStateTransition[F[_], I, S](label: Label[F, I, S])
extends Throwable(s"There is no action defined for state ${label._1} with input ${label._2}")
with TransducerError
/** Indicates that an output is not defined for a given state
*
* @param label The label of (state, input)
* @tparam I A finite set of valid input messages
* @tparam S A finite set of states
*/
final case class UndefinedOutput[F[_], I, S](label: Label[F, I, S])
extends Throwable(s"There is no output defined for state ${label._1} with input ${label._2}")
with TransducerError
private[this] def state[F[_]: ErrorF, I, S](actions: ActionF[F, I, S])(label: Label[F, I, S]): F[S] =
if (actions.isDefinedAt(label)) actions(label)
else NonEmptyList.of(UnsupportedStateTransition(label)).raiseError[F, S]
private[this] def output[F[_]: ErrorF, I, S, O](outputs: OutputF[F, I, S, O])(label: Label[F, I, S]): F[O] =
if (outputs.isDefinedAt(label)) outputs(label)
else NonEmptyList.of(UndefinedOutput(label)).raiseError[F, O]
/** Returns the sequence of states or errors for a given input tape
*
* @param transducer The transducer triple to use
* @param inputs A sequence of inputs from the valid input alphabet
* @tparam F The error-handling abstraction type-class
* @tparam I A finite set of valid input messages
* @tparam S A finite set of states
* @tparam O A finite set of valid output messages
* @return Sequence of F[S]
*/
private def runStates[F[_]: ErrorF, I, S, O](transducer: Transducer[F, I, S, O])(inputs: Seq[I]): Seq[F[S]] = {
val (s, actions, _) = transducer
@tailrec
def loop(cur: F[S], in: Seq[I], acc: Seq[F[S]]): Seq[F[S]] = in match {
case x :: xs =>
val newState = state(actions)(cur -> x)
loop(newState.recoverWith { case _ => cur }, xs, acc :+ newState)
case _ => acc
}
loop(s, inputs, Nil)
}
/** Returns the sequence of outputs or errors for a given input tape
*
* @param transducer The transducer triple to use
* @param inputs A sequence of inputs from the valid input alphabet
* @tparam F The error-handling abstraction type-class
* @tparam I A finite set of valid input messages
* @tparam S A finite set of states
* @tparam O A finite set of valid output messages
* @return Sequence of F[O]
*/
def run[F[_]: ErrorF, I, S, O](transducer: Transducer[F, I, S, O])(inputs: Seq[I]): Seq[F[O]] = {
val (_, _, outputs) = transducer
runStates(transducer)(inputs)
.zip(inputs)
.map { case (s, i) => output(outputs)(s -> i) }
}
}
import cats.data.NonEmptyList
import cats.syntax.either._
import Transducer._
/** This is a simple example of an Aggregate implementation using a transducer */
object UserRegistration {
type ErrorF[A] = Either[NonEmptyList[Throwable], A]
type RegistrationAction = ActionF[ErrorF, Command, State]
type RegistrationOutput = OutputF[ErrorF, Command, State, Event]
sealed trait State
final case object PotentialCustomer extends State
final case object WaitingForConfirmation extends State
final case object Registered extends State
final case object Deleted extends State
sealed trait Command
final case object StartRegistration extends Command
final case object ConfirmAccount extends Command
final case object GDPRDeletion extends Command
sealed trait Event
final case object ConfirmationSent extends Event
final case object AccountConfirmed extends Event
final case object AccountDeleted extends Event
sealed trait UserRegistrationError
final case class InvalidStateForCommand(cmd: Command)
extends Throwable(s"Cannot process command $cmd for invalid state")
def startRegistrationAction: RegistrationAction = {
case (Right(PotentialCustomer), StartRegistration) => WaitingForConfirmation.asRight
}
def confirmAccountAction: RegistrationAction = {
case (Right(WaitingForConfirmation), ConfirmAccount) => Registered.asRight
}
def gdprDeleteAction: RegistrationAction = {
case (Right(Registered), GDPRDeletion) => Deleted.asRight
case (Right(WaitingForConfirmation), GDPRDeletion) => Deleted.asRight
}
def confirmationSentOutput: RegistrationOutput = {
case (Right(WaitingForConfirmation), StartRegistration) => ConfirmationSent.asRight
}
def accountConfirmedOutput: RegistrationOutput = {
case (Right(Registered), ConfirmAccount) => AccountConfirmed.asRight
}
def accountDeletedOutput: RegistrationOutput = {
case (Right(Deleted), GDPRDeletion) => AccountDeleted.asRight
}
def invalidCommandOutput: RegistrationOutput = {
case (Left(errorList), c: Command) => errorList.append(InvalidStateForCommand(c)).asLeft
}
lazy val userRegistration: Transducer[ErrorF, Command, State, Event] = (
PotentialCustomer.asRight,
startRegistrationAction
orElse confirmAccountAction
orElse gdprDeleteAction,
confirmationSentOutput
orElse accountConfirmedOutput
orElse accountDeletedOutput
orElse invalidCommandOutput,
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment