Skip to content

Instantly share code, notes, and snippets.

@bblfish
Forked from Swoorup/ActorK.scala
Created September 5, 2022 07:34
Show Gist options
  • Save bblfish/5e80af8b48aea369b53a64f62ec1d215 to your computer and use it in GitHub Desktop.
Save bblfish/5e80af8b48aea369b53a64f62ec1d215 to your computer and use it in GitHub Desktop.
Typed Actors using Cats Effect, FS2 and Deferred Magic
import cats.effect.syntax.all.*
import cats.syntax.all.*
import cats.effect.*
import fs2.Stream
import cats.effect.std.Queue
import scala.concurrent.duration.*
import lib.FSM
import lib.actor.{Actor, AskMsg}
type ActorBehavior[F[_], S, -I[_], O] = ((S, I[O]) => F[(S, O | Unit)])
type ActorBehaviorK[F[_], S, -I[_]] = [O] => ((S, I[O]) => F[(S, O | Unit)])
/** Api for higher-kinded actors
*
* @note
* Currently this isn't very pleasant to use.
* ETA expansion and Type inference for polymorphic function type needs to be improved
* - https://github.com/lampepfl/dotty/issues/15555
* - https://github.com/lampepfl/dotty/issues/15554
*/
trait ActorK[F[_], -I[_]]:
/** Send one-way message without waiting for acknowledgement
* @param msg the message send
*/
def send[O](msg: I[O]): F[Unit]
/** Send the message and wait for acknowledgement with output
* @param msg the message to send
*/
def ask[O](msg: I[O]): F[O]
def closed: F[Boolean]
infix def ![O](msg: I[O]): F[Unit] = send(msg)
infix def ?[O](msg: I[O]): F[O] = ask(msg)
object ActorK:
/** Create a new actor that can self-reference
* @param initialState the initial state
* @param createFSM fsm constructor with self actor ref
* @param finalize the finalizer effect with the last state
* @return Actor object
*/
def makeFullK[F[_]: Temporal, S, I[_]](
initialState: S,
createFSM: [O] => (self: ActorK[F, I]) => ActorBehavior[F, S, I, O],
finalize: (state: S) => F[Unit]
): Resource[F, ActorK[F, I]] =
def toActorK(actor: Actor[F, I[Any], Any]): ActorK[F, I] =
new ActorK[F, I]:
def send[O](msg: I[O]): F[Unit] =
actor.send(msg.asInstanceOf[I[Any]])
def ask[O](msg: I[O]): F[O] =
actor.ask(msg.asInstanceOf[I[Any]]).asInstanceOf[F[O]]
def closed: F[Boolean] =
actor.closed
Actor
.makeFull[F, S, I[Any], Any](
initialState,
self =>
val k = toActorK(self)
FSM[F, S, I[Any], Any](createFSM(k))
,
finalize
)
.map(toActorK)
/** Create a new actor that can self-reference
* @param initialState the initial state
* @param fsm fsm constructor with self actor ref
* @param finalize the finalizer effect with the last state
* @return Actor object
*/
def makeK[F[_]: Temporal, S, I[_]](
initialState: S,
fsm: ActorBehaviorK[F, S, I],
finalize: (state: S) => F[Unit]
): Resource[F, ActorK[F, I]] =
makeFullK(initialState, [O] => (_: ActorK[F, I]) => fsm[O], finalize)
package lib
import cats.syntax.all.*
import cats.effect.std.Queue
import cats.effect.{Concurrent, Deferred, Ref, Resource}
import cats.effect.syntax.all.*
import fs2.Stream
import FSM
import scala.annotation.targetName
trait AskMsg[F[_], R]:
def replyTo: Deferred[F, R]
trait Actor[F[_], I, O]:
/** Send the message without waiting for acknowledgement
* @param msg the message send
*/
def sendNoWait(msg: I): F[Unit]
/** Send the message and wait for acknowledgement with output
* @param msg the message to send
*/
def send(msg: I): F[O]
/** Send the message and wait for typed response
* @param msg the request message
*/
def ask[Response, M <: I & AskMsg[F, Response]](msg: Deferred[F, Response] => M): F[Response]
@targetName("sendNoWait_infix")
infix def !(input: I): F[Unit] = sendNoWait(input)
case class ActorDeadException(msg: String) extends Exception(msg)
object Actor:
/** Create a new actor that can self-reference
* @param initialState the initial state
* @param createFsm fsm constructor with self actor ref
* @param finalize the finalizer effect with the last state
* @return Actor object
*/
def makeFull[F[_]: Concurrent, S, I, O](
initialState: S,
createFsm: (self: Actor[F, I, O]) => FSM[F, S, I, O],
finalize: (state: S) => F[Unit]
): Resource[F, Actor[F, I, O]] =
for
actorRef <- Deferred[F, Actor[F, I, O]].toResource
mailbox <- Queue.unbounded[F, (I, Deferred[F, O])].toResource
isDeadRef <- Ref.of[F, Boolean](false).toResource
_ <- Stream
.eval((Ref.of[F, S](initialState), actorRef.get).tupled)
.flatMap((ref, actorRef) =>
val fsm = createFsm(actorRef)
Stream
.fromQueueUnterminated(mailbox)
.evalScan(initialState) { case (state, (input, replyTo)) =>
fsm
.run(state, input)
.flatMap((newState, output) => ref.set(newState) *> replyTo.complete(output).as(newState))
}
.onFinalize((isDeadRef.set(true) *> ref.get.flatMap(finalize)).uncancelable)
)
.compile
.drain
.background
throwIfDead = isDeadRef.get.flatMap(Concurrent[F].raiseWhen(_)(ActorDeadException("Actor is dead")))
actor = new Actor[F, I, O]:
def sendNoWait(input: I): F[Unit] =
throwIfDead *> Deferred[F, O].flatMap(mailbox.offer(input, _)).void
def send(msg: I): F[O] =
throwIfDead *> Deferred[F, O].flatMap(promise => mailbox.offer(msg, promise) *> promise.get)
def ask[Response, M <: I & AskMsg[F, Response]](msg: Deferred[F, Response] => M): F[Response] =
throwIfDead *> Deferred[F, Response].flatMap(promise => send(msg(promise)) *> promise.get)
_ <- actorRef.complete(actor).toResource
yield actor
/** Create a new actor with finalizer
* @param initialState Initial state of the actor
* @param fsm the finite state machine
* @param finalize the cleanup effect with the last known state
* @return Actor object
*/
def makeWithFinalize[F[_]: Concurrent, S, I, O](
initialState: S,
fsm: FSM[F, S, I, O],
finalize: S => F[Unit]
): Resource[F, Actor[F, I, O]] =
makeFull(initialState, (_: Actor[F, I, O]) => fsm, finalize)
def make[F[_]: Concurrent, S, I, O](
initialState: S,
fsm: FSM[F, S, I, O]
): Resource[F, Actor[F, I, O]] =
makeWithFinalize(initialState, fsm, _ => Concurrent[F].unit)
def makeSimple[F[_]: Concurrent, S, I, O](
initialState: S,
fsm: FSM[F, S, I, O]
): Resource[F, I => F[O]] =
make(initialState, fsm).map(_.send)
package lib
import scala.util.chaining.*
import cats.syntax.all.*
import cats.{Applicative, Functor, Id}
/** F[_] - Effect S - State I - Input O - Output
*/
case class FSM[F[_], S, I, O](run: (S, I) => F[(S, O)]):
def runS(using F: Functor[F]): (S, I) => F[S] =
(s, i) => run(s, i).map(_._1)
object FSM:
def id[S, I, O](run: (S, I) => Id[(S, O)]): FSM[Id, S, I, O] = FSM(run)
def pure[F[_]: Applicative, S, I, O](run: (S, I) => (S, O)): FSM[F, S, I, O] =
FSM { case (s, in) => Applicative[F].pure(run(s, in)) }
import cats.effect.syntax.all.*
import cats.syntax.all.*
import cats.effect.*
import fs2.Stream
import cats.effect.std.Queue
import scala.concurrent.duration.*
import lib.FSM
import lib.actor.{Actor, AskMsg}
case class Ping[F[_]](from: String, replyTo: Deferred[F, String]) extends AskMsg[F, String]
val pongBehavior = FSM[IO, Unit, Ping[IO], Unit] { case (_, Ping(who, replyTo)) =>
// IO.sleep(2.seconds) *>
replyTo
.complete(s"Hello ${who} from pong")
.void
.tupleLeft(())
}
type CounterMessages[F[_]] = Int | CounterInput[F]
enum CounterInput[F[_]]:
case Inc()
case Dec()
case Get(replyTo: Deferred[F, Int]) extends CounterInput[F] with AskMsg[F, Int]
val counterBehavior = FSM[IO, Int, CounterMessages[IO], Unit] {
case (s, CounterInput.Inc()) =>
IO(s + 1, ())
case (s, CounterInput.Dec()) =>
IO(s + 1, ())
case (s, CounterInput.Get(replyTo)) =>
replyTo.complete(s) *> IO(s, ())
case (_, value: Int) =>
IO(value, ())
}
object StreamAskReplyDemo extends IOApp.Simple:
val SetPattern = "(set) ([0-9]*)".r
val run =
(
Actor.make((), pongBehavior),
Actor.make(0, counterBehavior)
).tupled
.use((pongActor, counterActor) =>
IO.println("Valid commands are [inc, dec, get, pong, set, quit]: ") *>
(for
input <- IO.readLine
_ <- input match
case "inc" => counterActor ! CounterInput.Inc()
case "dec" => counterActor ! CounterInput.Dec()
case SetPattern(_, param) => counterActor ! param.toInt
case "get" =>
counterActor
.ask(CounterInput.Get(_))
.flatMap(response => IO.println(s"Counter: ${response}"))
case "pong" =>
pongActor
.ask(Ping("Sytherax", _))
.timeout(1.second)
.flatMap(c => IO.println(s"Ping Response: ${c}"))
case "quit" => IO.unit
case _ => IO.println("Invalid input")
yield input).iterateUntil(_ == "quit").void
)
import lib.actor.ActorK
import lib.actor.*
object TypedActorsKDemo extends IOApp.Simple:
case object Ping
type M[A] = Ping.type
def pongBehaviorK[O]: ActorBehavior[IO, Unit, M, O] =
case (_, Ping) =>
IO.sleep(10.seconds) *> IO.println("Pong") *> IO((), ())
enum CounterInput[+Response]:
case Inc
case Dec
case Get extends CounterInput[Int]
type Message[T] = Int | CounterInput[T]
def counterBehaviorK[O]: ActorBehavior[IO, Int, Message, O] =
case (state, CounterInput.Inc) =>
IO(state + 1, ())
case (state, CounterInput.Dec) =>
IO(state - 1, ())
case (state, CounterInput.Get) =>
IO(state, state)
case (state, value: Int) =>
IO(value, ())
val SetPattern = "(set) ([0-9]*)".r
val run =
(
ActorK.makeK[IO, Int, Message](0, [O] => counterBehaviorK(_: Int, _: Message[O]), _ => IO.unit),
ActorK.makeK[IO, Unit, M]((), [O] => pongBehaviorK(_: Unit, _: M[O]), _ => IO.unit)
).tupled
.use((counterActor, pongActor) =>
IO.println("Valid commands are [inc, dec, get, set, ping, quit]: ") *>
(for
input <- IO.readLine
_ <- input match
case "inc" => counterActor ! CounterInput.Inc
case "dec" => counterActor ? CounterInput.Dec
case SetPattern(_, param) => counterActor ! param.toInt
case "get" =>
counterActor
.?(CounterInput.Get)
.flatMap(response => IO.println(s"Get Response: ${response}"))
case "ping" => pongActor ? Ping
case "quit" => IO.unit
case _ => IO.println("Invalid input")
yield input).iterateUntil(_ == "quit").void
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment