Last active
March 24, 2023 12:59
-
-
Save samspills/b75ad646d226715d44f770e6d130cdc6 to your computer and use it in GitHub Desktop.
how does metered work
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //> using lib "co.fs2::fs2-core:3.4.0" | |
| //> using lib "org.typelevel::cats-effect:3.4.4" | |
| /** Toy example with with metered stream. What happens if a stream element takes longer than metering? | |
| */ | |
| import fs2.Stream | |
| import cats.effect.IO | |
| import cats.effect.IOApp | |
| import scala.concurrent.duration._ | |
| import cats.effect.kernel.Ref | |
| object Scheduled extends IOApp.Simple { | |
| val theRefThing: IO[Ref[IO, Int]] = Ref[IO].of(1) | |
| def scheduledUpdate(ref: Ref[IO, Int]): Stream[IO, Unit] = { | |
| val prog = ref | |
| .updateAndGet(int => int + 1) | |
| .flatMap(int => IO.println(s"updated the ref to $int") *> IO.sleep(4.seconds)) | |
| Stream.repeatEval(prog).metered(2.second) | |
| } | |
| def printRefThing(ref: Ref[IO, Int]): Stream[IO, Unit] = { | |
| val prog = ref.get | |
| .flatMap(int => IO.println(s"the current ref value is ${int}")) | |
| Stream.repeatEval(prog).metered(1.second) | |
| } | |
| def run: IO[Unit] = theRefThing.flatMap(ref => | |
| printRefThing(ref) | |
| .concurrently(scheduledUpdate(ref)) | |
| .interruptAfter(13.seconds) | |
| .compile | |
| .drain | |
| ) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment