Last active
January 8, 2019 02:20
-
-
Save afsalthaj/a30f2eb2754442b22d28fb283cc7e6e1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fs2.Stream // 0.10.6, a few things changed in 1.0.0 | |
import cats.effect.IO | |
import scalaz.syntax.monad._ | |
import scalaz.{\/, -\/, \/-} | |
import scalaz.Monad | |
// Resource acquisition with fs2.Stream.bracket, `rethrow` in fs2, along with `catchable` in scalaz (or `MonadError` in cats) just makes life easy. | |
// Forming a stream where one of the data fetch resulted in error. | |
@ def formStream[F[_]]: Stream[F, Throwable \/ String] = Stream(\/-("1"), -\/(new RuntimeException("an exception during the streaming process"))).covary[F] | |
defined function formStream | |
// A stream with bracket for resource acquisition handling, and then process the above stream, but closing a resource throws exception | |
@ def streamm[F[_] : scalaz.Monad ] = Stream.bracket(println("acquiring").pure[F] )(_ => formStream[F].map(_.toEither).rethrow, _ => (throw new Exception("closing a resource may raise an exception"))) | |
defined function streamm | |
@ streamm[IO].compile.drain.attempt.unsafeRunSync | |
acquiring | |
res28: Either[Throwable, Unit] = Left(fs2.CompositeFailure: Multiple exceptions were thrown (2), first java.lang.RuntimeException: an exception during the streaming process) | |
// A stream with bracket for resource acquisition handling, and then process the above stream that has errors in it, with no exceptions during closing of resource. | |
@ def streamm[F[_] : scalaz.Monad ] = Stream.bracket(println("acquiring").pure[F] )(_ => formStream[F].map(_.toEither).rethrow, _ => (println("closed a resource").pure[F])) | |
defined function streamm | |
@ streamm[IO].compile.drain.attempt.unsafeRunSync | |
acquiring | |
closed a resource | |
res30: Either[Throwable, Unit] = Left(java.lang.RuntimeException: an exception during the streaming process) | |
// A good pattern of closing a resource: | |
// A separate total function that closes your resource | |
@ def close[F[_] : Monad]: F[Throwable \/ Unit] = (new RuntimeException: Throwable).left[Unit].pure[F] | |
defined function close | |
@ def formStream[F[_]]: Stream[F, Throwable \/ String] = Stream(\/-("1")).covary[F] | |
defined function formStream | |
@ def streamm[F[_] : scalaz.Monad : scalaz.Catchable] = Stream.bracket(println("acquiring").pure[F] )(_ => formStream[F].map(_.toEither).rethrow, _ => close[F].flatMap(_.fold(e => Catchable[F].fail(e), _ => Monad[F].pure(())))) | |
defined function streamm | |
@ streamm[IO].compile.drain.attempt.unsafeRunSync | |
acquiring | |
res40: Either[Throwable, Unit] = Left(java.lang.RuntimeException) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment