Skip to content

Instantly share code, notes, and snippets.

@samspills
Created January 11, 2023 19:31
Show Gist options
  • Select an option

  • Save samspills/b1a3434e1bac21ac9c62004df2f25306 to your computer and use it in GitHub Desktop.

Select an option

Save samspills/b1a3434e1bac21ac9c62004df2f25306 to your computer and use it in GitHub Desktop.
toy example with fs2 streams + cats-effect ref
//> using lib "co.fs2::fs2-core:3.4.0"
//> using lib "org.typelevel::cats-effect:3.4.4"
/** Toy example with streams + ref. The ref is initialized to some value, and there are two
* concurrent streams that are passed the ref. The first stream prints the value of the ref every
* second. The second stream updates the value of the ref every 3 seconds. The whole example runs
* for 10 seconds.
*
* This example is pretty simple. The thing that really tripped me up while writing it was passing
* the `IO[Ref[IO, Int]]` to the two streams, which led to them evaluating the ref fresh on each
* evaluation. It needs to be the _actual_ `Ref[IO, Int]` that they work with in order for it to be
* evaluated once and shared between them.
*/
import fs2.Stream
import cats.effect.IO
import cats.effect.IOApp
import scala.concurrent.duration._
import cats.effect.kernel.Ref
object Scheduled extends IOApp.Simple {
val theRefThing: IO[Ref[IO, Int]] = Ref[IO].of(1)
def scheduledUpdate(ref: Ref[IO, Int]): Stream[IO, Unit] = {
val prog = ref
.updateAndGet(int => int + 1)
.flatMap(int => IO.println(s"updated the ref to $int"))
Stream.repeatEval(prog).metered(3.seconds)
}
def printRefThing(ref: Ref[IO, Int]): Stream[IO, Unit] = {
val prog = ref.get
.flatMap(int => IO.println(s"the current ref value is ${int}"))
Stream.repeatEval(prog).metered(1.second)
}
def run: IO[Unit] = theRefThing.flatMap(ref =>
printRefThing(ref)
.concurrently(scheduledUpdate(ref))
.interruptAfter(10.seconds)
.compile
.drain
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment