Skip to content

Instantly share code, notes, and snippets.

@gvolpe
Last active July 24, 2018 05:37
Show Gist options
  • Save gvolpe/d7783459c99321db3611a8962134e0c7 to your computer and use it in GitHub Desktop.
Save gvolpe/d7783459c99321db3611a8962134e0c7 to your computer and use it in GitHub Desktop.
import cats.MonadError
import cats.effect.IO
import cats.effect.concurrent.Deferred
import cats.instances.list._
import cats.instances.string._
import cats.kernel.Monoid
import cats.syntax.all._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
val io1 = IO.sleep(1.second) *> IO.raiseError[String](new Exception("error 1"))
val io2 = IO.sleep(1.1.seconds) *> IO.raiseError[String](new Exception("error 2"))
val io3 = IO.sleep(1.2.seconds) *> IO.pure("success")
val io4 = IO.sleep(1.4.seconds) *> IO.pure("slower success")
//val tasks = List(io1, io2) // It will time out because there's no successful value
val tasks = List(io1, io2, io3, io4)
def firstSuccessful[A: Monoid](d: Deferred[IO, A])(ioa: IO[A]) =
ioa.attempt.flatMap {
case Right(x) => IO(println(s"Processing: $x")) *> d.complete(x).attempt *> IO.pure(x) <* IO.raiseError(new Exception()) // This exception makes the short-circuit happen
case Left(t) => IO(println(s"Ignoring error: ${t.getMessage}")) *> IO.pure(Monoid[A].empty) // Ignore the errors
}
val program: IO[Unit] =
for {
d <- Deferred[IO, String]
rs <- tasks.parTraverse(firstSuccessful(d)).attempt *> d.get.timeout(2.seconds)
_ <- IO(println(rs))
} yield rs
program.unsafeRunSync()
/****** Abstracting over the effect type **********/
def firstSuccessfulAbstract[F[_], A: Monoid](d: Deferred[F, A])(fa: F[A])(implicit F: MonadError[F, Throwable]) =
fa.attempt.flatMap {
case Right(x) => d.complete(x).attempt *> F.pure(x) <* F.raiseError(new Exception()) // This exception makes the short-circuit happen
case Left(_) => F.pure(Monoid[A].empty) // Ignore the errors
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment