Skip to content

Instantly share code, notes, and snippets.

@ShahOdin
Created February 5, 2020 20:44
Show Gist options
  • Save ShahOdin/e48e147bb8d46032188fbcfb76d72bf1 to your computer and use it in GitHub Desktop.
Save ShahOdin/e48e147bb8d46032188fbcfb76d72bf1 to your computer and use it in GitHub Desktop.
fs2 stream error handling
import cats.effect.{ExitCode, IO, IOApp, Resource}
import scala.util.Random
object Demo extends IOApp {
var i = 0
def foo: IO[Int] = IO{
i = i + 1
i
}.flatTap(
j => IO{
if (Random.nextInt(100) % 23 == 0) {
println(s"Boom at $j")
throw new Exception("boom")
}
else {
println(s"$j was handled.")
}
}
)
import scala.concurrent.duration._
val internal: fs2.Stream[IO, Int] = fs2.Stream.resource(
Resource.make(IO(println("resource being allocated!")))(_ => IO(println("resource being released")))
).flatMap(
_ => fs2.Stream.repeatEval(foo)
)
val stream: fs2.Stream[IO, Int] = internal.handleErrorWith(_ => stream.delayBy(5.seconds))
override def run(args: List[String]): IO[ExitCode] =
stream.take(20).compile.toVector.map(println).as(ExitCode.Success)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment