Skip to content

Instantly share code, notes, and snippets.

@samspills
Last active March 24, 2023 12:59
Show Gist options
  • Select an option

  • Save samspills/b75ad646d226715d44f770e6d130cdc6 to your computer and use it in GitHub Desktop.

Select an option

Save samspills/b75ad646d226715d44f770e6d130cdc6 to your computer and use it in GitHub Desktop.
how does metered work
//> using lib "co.fs2::fs2-core:3.4.0"
//> using lib "org.typelevel::cats-effect:3.4.4"
/** Toy example with with metered stream. What happens if a stream element takes longer than metering?
*/
import fs2.Stream
import cats.effect.IO
import cats.effect.IOApp
import scala.concurrent.duration._
import cats.effect.kernel.Ref
object Scheduled extends IOApp.Simple {
val theRefThing: IO[Ref[IO, Int]] = Ref[IO].of(1)
def scheduledUpdate(ref: Ref[IO, Int]): Stream[IO, Unit] = {
val prog = ref
.updateAndGet(int => int + 1)
.flatMap(int => IO.println(s"updated the ref to $int") *> IO.sleep(4.seconds))
Stream.repeatEval(prog).metered(2.second)
}
def printRefThing(ref: Ref[IO, Int]): Stream[IO, Unit] = {
val prog = ref.get
.flatMap(int => IO.println(s"the current ref value is ${int}"))
Stream.repeatEval(prog).metered(1.second)
}
def run: IO[Unit] = theRefThing.flatMap(ref =>
printRefThing(ref)
.concurrently(scheduledUpdate(ref))
.interruptAfter(13.seconds)
.compile
.drain
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment