Last active
November 8, 2023 09:50
-
-
Save arturaz/2381b85c40674f4cd15a2992e97a43fa to your computer and use it in GitHub Desktop.
Simple Actor implementation using cats effect and FS2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.utils | |
import cats.effect.Concurrent | |
import cats.effect.kernel.{DeferredSink, DeferredSource} | |
import fs2.concurrent.Channel | |
/** | |
* An actor that processes messages one at a time. | |
*/ | |
trait Actor[F[_], Message] { | |
/** | |
* Sends the message to the actor. The [[F]] semantically blocks until the message is put into the actors mailbox. | |
* | |
* If the actor is stopped, the message is not processed. | |
* */ | |
def send(message: Message): F[Either[Channel.Closed, Actor.MessageEnqueued[F]]] | |
/** Returns true if the actor is stopped. */ | |
def stopped: F[Boolean] | |
/** Stops the actor after all messages are consumed. See [[Channel.close]]. */ | |
def stop: F[Either[Channel.Closed, Unit]] | |
} | |
trait InspectableActor[F[_], Message, State] extends Actor[F, Message] { | |
/** Returns the current state of the actor. */ | |
def state: F[State] | |
} | |
object Actor { | |
/** | |
* Indicates that the message has been enqueued in the actors mailbox. | |
* | |
* @param messageProcessed Gets completed once the message has been processed. | |
* */ | |
case class MessageEnqueued[F[_]](messageProcessed: DeferredSource[F, Unit]) extends AnyVal | |
private case class ChannelMessage[F[_], Message](message: Message, processingCompleted: DeferredSink[F, Unit]) | |
/** | |
* Creates and starts an [[Actor]]. | |
* | |
* @param onMessage Transitions to the next state based on the message and the current state. The [[F]] is awaited for | |
* before the next message is processed. | |
* @param onException Invoked when [[onMessage]] fails with an exception. The [[F]] is awaited for before the next | |
* message is processed. | |
*/ | |
def create[F[_] : Concurrent, Message, State]( | |
initialState: State | |
)( | |
onMessage: (Message, State) => F[State], | |
onException: (Message, State, Throwable) => F[Unit] | |
): F[InspectableActor[F, Message, State]] = { | |
/** The actor loop. */ | |
def consumeStream(ref: Ref[F, State], stream: fs2.Stream[F, ChannelMessage[F, Message]]): F[Unit] = { | |
stream | |
.evalTap { case ChannelMessage(message, processingCompleted) => | |
for { | |
state <- ref.get | |
_ <- onMessage(message, state).attempt.flatMap { | |
case Left(e) => onException(message, state, e) | |
case Right(newState) => ref.set(newState) *> processingCompleted.complete(()).void | |
} | |
} yield () | |
} | |
.compile | |
.drain | |
} | |
for { | |
inbox <- Channel.unbounded[F, ChannelMessage[F, Message]] | |
ref <- Ref[F].of(initialState) | |
_ <- consumeStream(ref, inbox.stream).start | |
} yield new InspectableActor[F, Message, State] { | |
override def send(message: Message): F[Either[Channel.Closed, MessageEnqueued[F]]] = { | |
for { | |
deferred <- Deferred[F, Unit] | |
either <- inbox.send(ChannelMessage(message, deferred)) | |
} yield either.as(MessageEnqueued(deferred)) | |
} | |
override def state: F[State] = | |
ref.get | |
override def stopped: F[Boolean] = | |
inbox.isClosed | |
override def stop: F[Either[Channel.Closed, Unit]] = | |
inbox.close | |
} | |
} | |
/** | |
* As [[create]] but the actor does not have a state. | |
*/ | |
def createStateless[F[_] : Concurrent, Message]( | |
onMessage: Message => F[Unit], | |
onException: (Message, Throwable) => F[Unit] | |
): F[Actor[F, Message]] = create[F, Message, Unit](())( | |
onMessage = (msg, _) => onMessage(msg), | |
onException = (msg, _, e) => onException(msg, e) | |
).map(actor => actor: Actor[F, Message]) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment