Created
January 11, 2023 19:31
-
-
Save samspills/b1a3434e1bac21ac9c62004df2f25306 to your computer and use it in GitHub Desktop.
toy example with fs2 streams + cats-effect ref
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //> using lib "co.fs2::fs2-core:3.4.0" | |
| //> using lib "org.typelevel::cats-effect:3.4.4" | |
| /** Toy example with streams + ref. The ref is initialized to some value, and there are two | |
| * concurrent streams that are passed the ref. The first stream prints the value of the ref every | |
| * second. The second stream updates the value of the ref every 3 seconds. The whole example runs | |
| * for 10 seconds. | |
| * | |
| * This example is pretty simple. The thing that really tripped me up while writing it was passing | |
| * the `IO[Ref[IO, Int]]` to the two streams, which led to them evaluating the ref fresh on each | |
| * evaluation. It needs to be the _actual_ `Ref[IO, Int]` that they work with in order for it to be | |
| * evaluated once and shared between them. | |
| */ | |
| import fs2.Stream | |
| import cats.effect.IO | |
| import cats.effect.IOApp | |
| import scala.concurrent.duration._ | |
| import cats.effect.kernel.Ref | |
| object Scheduled extends IOApp.Simple { | |
| val theRefThing: IO[Ref[IO, Int]] = Ref[IO].of(1) | |
| def scheduledUpdate(ref: Ref[IO, Int]): Stream[IO, Unit] = { | |
| val prog = ref | |
| .updateAndGet(int => int + 1) | |
| .flatMap(int => IO.println(s"updated the ref to $int")) | |
| Stream.repeatEval(prog).metered(3.seconds) | |
| } | |
| def printRefThing(ref: Ref[IO, Int]): Stream[IO, Unit] = { | |
| val prog = ref.get | |
| .flatMap(int => IO.println(s"the current ref value is ${int}")) | |
| Stream.repeatEval(prog).metered(1.second) | |
| } | |
| def run: IO[Unit] = theRefThing.flatMap(ref => | |
| printRefThing(ref) | |
| .concurrently(scheduledUpdate(ref)) | |
| .interruptAfter(10.seconds) | |
| .compile | |
| .drain | |
| ) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment