Skip to content

Instantly share code, notes, and snippets.

@ayeo
Created March 10, 2022 21:49
Show Gist options
  • Save ayeo/a3a19bf6ae5d3e436f8133203648cc8d to your computer and use it in GitHub Desktop.
Save ayeo/a3a19bf6ae5d3e436f8133203648cc8d to your computer and use it in GitHub Desktop.
package pl.ayeo
import cats.effect.{IO, IOApp, Ref}
import cats.effect.std.Queue
import cats.effect.unsafe.implicits.global
import cats.syntax.all.*
import fs2.{Pipe, Stream}
import scala.concurrent.duration.*
object ActorApp extends IOApp.Simple {
case class Actor(r: Ref[IO, Int]) {
def handle(input: String): IO[Option[String]] = r.getAndUpdate(_ + 1).map { c =>
if (input == "Hello") Some("Hello to myself")
else if (c % 5 == 0) Some("Hello")
else None
}
}
def actorPipe(q: Queue[IO, String], actor: Actor): Pipe[IO, String, String] = _.evalMap { input =>
for {
oi <- actor.handle(input)
_ <- oi.map(q.offer).sequence
} yield input
}
val ioActor: IO[Actor] = Ref[IO].of(0).map(Actor(_))
val inboxQueue: IO[Queue[IO, String]] = Queue.unbounded[IO, String]
val outboxQueue: IO[Queue[IO, String]] = Queue.unbounded[IO, String]
val finalStream: Stream[IO, (Actor, Queue[IO, String], Queue[IO, String])] = Stream.eval {
for {
a <- ioActor
i <- inboxQueue
o <- outboxQueue
} yield (a, i , o)
}
val ticStream: Stream[IO, String] = Stream.repeatEval(IO.sleep(500.millis) >> IO("tic"))
val program: Stream[IO, String] = finalStream.flatMap {
(actor, inbox, outbox) =>
Stream.fromQueueUnterminated(inbox).merge(ticStream).through(actorPipe(inbox, actor))
}
override def run: IO[Unit] =
program.through(_.evalMap(t => IO.println(s"${Thread.currentThread().getName()}:\t $t"))).compile.drain
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment