Skip to content

Instantly share code, notes, and snippets.

@afsalthaj
Last active January 8, 2019 02:20
Show Gist options
  • Save afsalthaj/a30f2eb2754442b22d28fb283cc7e6e1 to your computer and use it in GitHub Desktop.
Save afsalthaj/a30f2eb2754442b22d28fb283cc7e6e1 to your computer and use it in GitHub Desktop.
import fs2.Stream // 0.10.6, a few things changed in 1.0.0
import cats.effect.IO
import scalaz.syntax.monad._
import scalaz.{\/, -\/, \/-}
import scalaz.Monad
// Resource acquisition with fs2.Stream.bracket, `rethrow` in fs2, along with `catchable` in scalaz (or `MonadError` in cats) just makes life easy.
// Forming a stream where one of the data fetch resulted in error.
@ def formStream[F[_]]: Stream[F, Throwable \/ String] = Stream(\/-("1"), -\/(new RuntimeException("an exception during the streaming process"))).covary[F]
defined function formStream
// A stream with bracket for resource acquisition handling, and then process the above stream, but closing a resource throws exception
@ def streamm[F[_] : scalaz.Monad ] = Stream.bracket(println("acquiring").pure[F] )(_ => formStream[F].map(_.toEither).rethrow, _ => (throw new Exception("closing a resource may raise an exception")))
defined function streamm
@ streamm[IO].compile.drain.attempt.unsafeRunSync
acquiring
res28: Either[Throwable, Unit] = Left(fs2.CompositeFailure: Multiple exceptions were thrown (2), first java.lang.RuntimeException: an exception during the streaming process)
// A stream with bracket for resource acquisition handling, and then process the above stream that has errors in it, with no exceptions during closing of resource.
@ def streamm[F[_] : scalaz.Monad ] = Stream.bracket(println("acquiring").pure[F] )(_ => formStream[F].map(_.toEither).rethrow, _ => (println("closed a resource").pure[F]))
defined function streamm
@ streamm[IO].compile.drain.attempt.unsafeRunSync
acquiring
closed a resource
res30: Either[Throwable, Unit] = Left(java.lang.RuntimeException: an exception during the streaming process)
// A good pattern of closing a resource:
// A separate total function that closes your resource
@ def close[F[_] : Monad]: F[Throwable \/ Unit] = (new RuntimeException: Throwable).left[Unit].pure[F]
defined function close
@ def formStream[F[_]]: Stream[F, Throwable \/ String] = Stream(\/-("1")).covary[F]
defined function formStream
@ def streamm[F[_] : scalaz.Monad : scalaz.Catchable] = Stream.bracket(println("acquiring").pure[F] )(_ => formStream[F].map(_.toEither).rethrow, _ => close[F].flatMap(_.fold(e => Catchable[F].fail(e), _ => Monad[F].pure(()))))
defined function streamm
@ streamm[IO].compile.drain.attempt.unsafeRunSync
acquiring
res40: Either[Throwable, Unit] = Left(java.lang.RuntimeException)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment