Created
March 9, 2022 09:00
-
-
Save ayeo/17513d3edd29b4dc439f713e1eed8847 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pl.ayeo | |
import cats.effect.IO | |
import cats.effect.Ref | |
import cats.effect.std.Queue | |
import cats.effect.unsafe.implicits.global | |
import cats.syntax.all.* | |
import fs2.{Pipe, Stream} | |
import scala.concurrent.duration.* | |
object QueueApp extends App { | |
def i1 = Stream.repeatEval(IO.readLine.map(_.toIntOption)) | |
def i2 = Stream.repeatEval(IO.sleep(1.second) >> IO(Some(1))) | |
def actor2(adder: IO[Adder]): Pipe[IO, Option[Int], IO[Option[Int]]] = | |
stream => stream.map(input => | |
adder.map { a => | |
input.map { integer => | |
a.run(integer) | |
}.sequence | |
}.flatten | |
) | |
def logger: Pipe[IO, IO[Option[Int]], Unit] = _.map(println) | |
def a2(i: Option[Int]): Option[Int] = i.map(_ + 2) | |
case class Adder(c: Ref[IO, Int]) { | |
def run(i: Int): IO[Int] = c.modify(d => (i + d, i)) | |
} | |
val ioAdder: IO[Adder] = Ref[IO].of(0).map(Adder(_)) | |
i1.merge(i2).through(actor2(ioAdder)).through(logger).compile.drain.unsafeRunSync() | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment