-
-
Save bblfish/5e80af8b48aea369b53a64f62ec1d215 to your computer and use it in GitHub Desktop.
Typed Actors using Cats Effect, FS2 and Deferred Magic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.syntax.all.* | |
import cats.syntax.all.* | |
import cats.effect.* | |
import fs2.Stream | |
import cats.effect.std.Queue | |
import scala.concurrent.duration.* | |
import lib.FSM | |
import lib.actor.{Actor, AskMsg} | |
type ActorBehavior[F[_], S, -I[_], O] = ((S, I[O]) => F[(S, O | Unit)]) | |
type ActorBehaviorK[F[_], S, -I[_]] = [O] => ((S, I[O]) => F[(S, O | Unit)]) | |
/** Api for higher-kinded actors | |
* | |
* @note | |
* Currently this isn't very pleasant to use. | |
* ETA expansion and Type inference for polymorphic function type needs to be improved | |
* - https://github.com/lampepfl/dotty/issues/15555 | |
* - https://github.com/lampepfl/dotty/issues/15554 | |
*/ | |
trait ActorK[F[_], -I[_]]: | |
/** Send one-way message without waiting for acknowledgement | |
* @param msg the message send | |
*/ | |
def send[O](msg: I[O]): F[Unit] | |
/** Send the message and wait for acknowledgement with output | |
* @param msg the message to send | |
*/ | |
def ask[O](msg: I[O]): F[O] | |
def closed: F[Boolean] | |
infix def ![O](msg: I[O]): F[Unit] = send(msg) | |
infix def ?[O](msg: I[O]): F[O] = ask(msg) | |
object ActorK: | |
/** Create a new actor that can self-reference | |
* @param initialState the initial state | |
* @param createFSM fsm constructor with self actor ref | |
* @param finalize the finalizer effect with the last state | |
* @return Actor object | |
*/ | |
def makeFullK[F[_]: Temporal, S, I[_]]( | |
initialState: S, | |
createFSM: [O] => (self: ActorK[F, I]) => ActorBehavior[F, S, I, O], | |
finalize: (state: S) => F[Unit] | |
): Resource[F, ActorK[F, I]] = | |
def toActorK(actor: Actor[F, I[Any], Any]): ActorK[F, I] = | |
new ActorK[F, I]: | |
def send[O](msg: I[O]): F[Unit] = | |
actor.send(msg.asInstanceOf[I[Any]]) | |
def ask[O](msg: I[O]): F[O] = | |
actor.ask(msg.asInstanceOf[I[Any]]).asInstanceOf[F[O]] | |
def closed: F[Boolean] = | |
actor.closed | |
Actor | |
.makeFull[F, S, I[Any], Any]( | |
initialState, | |
self => | |
val k = toActorK(self) | |
FSM[F, S, I[Any], Any](createFSM(k)) | |
, | |
finalize | |
) | |
.map(toActorK) | |
/** Create a new actor that can self-reference | |
* @param initialState the initial state | |
* @param fsm fsm constructor with self actor ref | |
* @param finalize the finalizer effect with the last state | |
* @return Actor object | |
*/ | |
def makeK[F[_]: Temporal, S, I[_]]( | |
initialState: S, | |
fsm: ActorBehaviorK[F, S, I], | |
finalize: (state: S) => F[Unit] | |
): Resource[F, ActorK[F, I]] = | |
makeFullK(initialState, [O] => (_: ActorK[F, I]) => fsm[O], finalize) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package lib | |
import cats.syntax.all.* | |
import cats.effect.std.Queue | |
import cats.effect.{Concurrent, Deferred, Ref, Resource} | |
import cats.effect.syntax.all.* | |
import fs2.Stream | |
import FSM | |
import scala.annotation.targetName | |
trait AskMsg[F[_], R]: | |
def replyTo: Deferred[F, R] | |
trait Actor[F[_], I, O]: | |
/** Send the message without waiting for acknowledgement | |
* @param msg the message send | |
*/ | |
def sendNoWait(msg: I): F[Unit] | |
/** Send the message and wait for acknowledgement with output | |
* @param msg the message to send | |
*/ | |
def send(msg: I): F[O] | |
/** Send the message and wait for typed response | |
* @param msg the request message | |
*/ | |
def ask[Response, M <: I & AskMsg[F, Response]](msg: Deferred[F, Response] => M): F[Response] | |
@targetName("sendNoWait_infix") | |
infix def !(input: I): F[Unit] = sendNoWait(input) | |
case class ActorDeadException(msg: String) extends Exception(msg) | |
object Actor: | |
/** Create a new actor that can self-reference | |
* @param initialState the initial state | |
* @param createFsm fsm constructor with self actor ref | |
* @param finalize the finalizer effect with the last state | |
* @return Actor object | |
*/ | |
def makeFull[F[_]: Concurrent, S, I, O]( | |
initialState: S, | |
createFsm: (self: Actor[F, I, O]) => FSM[F, S, I, O], | |
finalize: (state: S) => F[Unit] | |
): Resource[F, Actor[F, I, O]] = | |
for | |
actorRef <- Deferred[F, Actor[F, I, O]].toResource | |
mailbox <- Queue.unbounded[F, (I, Deferred[F, O])].toResource | |
isDeadRef <- Ref.of[F, Boolean](false).toResource | |
_ <- Stream | |
.eval((Ref.of[F, S](initialState), actorRef.get).tupled) | |
.flatMap((ref, actorRef) => | |
val fsm = createFsm(actorRef) | |
Stream | |
.fromQueueUnterminated(mailbox) | |
.evalScan(initialState) { case (state, (input, replyTo)) => | |
fsm | |
.run(state, input) | |
.flatMap((newState, output) => ref.set(newState) *> replyTo.complete(output).as(newState)) | |
} | |
.onFinalize((isDeadRef.set(true) *> ref.get.flatMap(finalize)).uncancelable) | |
) | |
.compile | |
.drain | |
.background | |
throwIfDead = isDeadRef.get.flatMap(Concurrent[F].raiseWhen(_)(ActorDeadException("Actor is dead"))) | |
actor = new Actor[F, I, O]: | |
def sendNoWait(input: I): F[Unit] = | |
throwIfDead *> Deferred[F, O].flatMap(mailbox.offer(input, _)).void | |
def send(msg: I): F[O] = | |
throwIfDead *> Deferred[F, O].flatMap(promise => mailbox.offer(msg, promise) *> promise.get) | |
def ask[Response, M <: I & AskMsg[F, Response]](msg: Deferred[F, Response] => M): F[Response] = | |
throwIfDead *> Deferred[F, Response].flatMap(promise => send(msg(promise)) *> promise.get) | |
_ <- actorRef.complete(actor).toResource | |
yield actor | |
/** Create a new actor with finalizer | |
* @param initialState Initial state of the actor | |
* @param fsm the finite state machine | |
* @param finalize the cleanup effect with the last known state | |
* @return Actor object | |
*/ | |
def makeWithFinalize[F[_]: Concurrent, S, I, O]( | |
initialState: S, | |
fsm: FSM[F, S, I, O], | |
finalize: S => F[Unit] | |
): Resource[F, Actor[F, I, O]] = | |
makeFull(initialState, (_: Actor[F, I, O]) => fsm, finalize) | |
def make[F[_]: Concurrent, S, I, O]( | |
initialState: S, | |
fsm: FSM[F, S, I, O] | |
): Resource[F, Actor[F, I, O]] = | |
makeWithFinalize(initialState, fsm, _ => Concurrent[F].unit) | |
def makeSimple[F[_]: Concurrent, S, I, O]( | |
initialState: S, | |
fsm: FSM[F, S, I, O] | |
): Resource[F, I => F[O]] = | |
make(initialState, fsm).map(_.send) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package lib | |
import scala.util.chaining.* | |
import cats.syntax.all.* | |
import cats.{Applicative, Functor, Id} | |
/** F[_] - Effect S - State I - Input O - Output | |
*/ | |
case class FSM[F[_], S, I, O](run: (S, I) => F[(S, O)]): | |
def runS(using F: Functor[F]): (S, I) => F[S] = | |
(s, i) => run(s, i).map(_._1) | |
object FSM: | |
def id[S, I, O](run: (S, I) => Id[(S, O)]): FSM[Id, S, I, O] = FSM(run) | |
def pure[F[_]: Applicative, S, I, O](run: (S, I) => (S, O)): FSM[F, S, I, O] = | |
FSM { case (s, in) => Applicative[F].pure(run(s, in)) } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.syntax.all.* | |
import cats.syntax.all.* | |
import cats.effect.* | |
import fs2.Stream | |
import cats.effect.std.Queue | |
import scala.concurrent.duration.* | |
import lib.FSM | |
import lib.actor.{Actor, AskMsg} | |
case class Ping[F[_]](from: String, replyTo: Deferred[F, String]) extends AskMsg[F, String] | |
val pongBehavior = FSM[IO, Unit, Ping[IO], Unit] { case (_, Ping(who, replyTo)) => | |
// IO.sleep(2.seconds) *> | |
replyTo | |
.complete(s"Hello ${who} from pong") | |
.void | |
.tupleLeft(()) | |
} | |
type CounterMessages[F[_]] = Int | CounterInput[F] | |
enum CounterInput[F[_]]: | |
case Inc() | |
case Dec() | |
case Get(replyTo: Deferred[F, Int]) extends CounterInput[F] with AskMsg[F, Int] | |
val counterBehavior = FSM[IO, Int, CounterMessages[IO], Unit] { | |
case (s, CounterInput.Inc()) => | |
IO(s + 1, ()) | |
case (s, CounterInput.Dec()) => | |
IO(s + 1, ()) | |
case (s, CounterInput.Get(replyTo)) => | |
replyTo.complete(s) *> IO(s, ()) | |
case (_, value: Int) => | |
IO(value, ()) | |
} | |
object StreamAskReplyDemo extends IOApp.Simple: | |
val SetPattern = "(set) ([0-9]*)".r | |
val run = | |
( | |
Actor.make((), pongBehavior), | |
Actor.make(0, counterBehavior) | |
).tupled | |
.use((pongActor, counterActor) => | |
IO.println("Valid commands are [inc, dec, get, pong, set, quit]: ") *> | |
(for | |
input <- IO.readLine | |
_ <- input match | |
case "inc" => counterActor ! CounterInput.Inc() | |
case "dec" => counterActor ! CounterInput.Dec() | |
case SetPattern(_, param) => counterActor ! param.toInt | |
case "get" => | |
counterActor | |
.ask(CounterInput.Get(_)) | |
.flatMap(response => IO.println(s"Counter: ${response}")) | |
case "pong" => | |
pongActor | |
.ask(Ping("Sytherax", _)) | |
.timeout(1.second) | |
.flatMap(c => IO.println(s"Ping Response: ${c}")) | |
case "quit" => IO.unit | |
case _ => IO.println("Invalid input") | |
yield input).iterateUntil(_ == "quit").void | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lib.actor.ActorK | |
import lib.actor.* | |
object TypedActorsKDemo extends IOApp.Simple: | |
case object Ping | |
type M[A] = Ping.type | |
def pongBehaviorK[O]: ActorBehavior[IO, Unit, M, O] = | |
case (_, Ping) => | |
IO.sleep(10.seconds) *> IO.println("Pong") *> IO((), ()) | |
enum CounterInput[+Response]: | |
case Inc | |
case Dec | |
case Get extends CounterInput[Int] | |
type Message[T] = Int | CounterInput[T] | |
def counterBehaviorK[O]: ActorBehavior[IO, Int, Message, O] = | |
case (state, CounterInput.Inc) => | |
IO(state + 1, ()) | |
case (state, CounterInput.Dec) => | |
IO(state - 1, ()) | |
case (state, CounterInput.Get) => | |
IO(state, state) | |
case (state, value: Int) => | |
IO(value, ()) | |
val SetPattern = "(set) ([0-9]*)".r | |
val run = | |
( | |
ActorK.makeK[IO, Int, Message](0, [O] => counterBehaviorK(_: Int, _: Message[O]), _ => IO.unit), | |
ActorK.makeK[IO, Unit, M]((), [O] => pongBehaviorK(_: Unit, _: M[O]), _ => IO.unit) | |
).tupled | |
.use((counterActor, pongActor) => | |
IO.println("Valid commands are [inc, dec, get, set, ping, quit]: ") *> | |
(for | |
input <- IO.readLine | |
_ <- input match | |
case "inc" => counterActor ! CounterInput.Inc | |
case "dec" => counterActor ? CounterInput.Dec | |
case SetPattern(_, param) => counterActor ! param.toInt | |
case "get" => | |
counterActor | |
.?(CounterInput.Get) | |
.flatMap(response => IO.println(s"Get Response: ${response}")) | |
case "ping" => pongActor ? Ping | |
case "quit" => IO.unit | |
case _ => IO.println("Invalid input") | |
yield input).iterateUntil(_ == "quit").void | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment