Last active
December 16, 2018 09:57
-
-
Save afsalthaj/0be023a60985ca22f29aee6f6cc6be52 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Retrying using Fs2, where if the result is `right` of disjunction stream stops, else retries for ever | |
| * with optional `FinitDuration` of delay between each retry, with logging as `Left \/ Right => IO[Unit]`. | |
| */ | |
| @SuppressWarnings(Array("org.wartremover.warts.DefaultArguments", "org.wartremover.warts.Recursion")) | |
| trait Fs2StreamRetrySupport { | |
| def delayed[F[_], A](b: => F[A], delay: Option[FiniteDuration])(implicit F: Effect[F]): Stream[F, A] = { | |
| delay.fold( | |
| Stream.eval[F, A](b) | |
| )(d => Stream.eval[F, A] { | |
| F.liftIO { | |
| cats.effect.IO.sleep(d) | |
| }.flatMap(_ => b) | |
| }) | |
| } | |
| // A retryForEver is best seen as fs2 Stream, and hence it is in cats.IO | |
| def retryForEverUntilRight[E, A]( | |
| body: => IO[E \/ A], | |
| delay: Option[FiniteDuration], | |
| log: (E \/ A) => IO[Unit] = (_: E \/ A) => IO[Unit](()) | |
| ): fs2.Stream[IO, E \/ A] = { | |
| def go (b: IO[E \/ A]): fs2.Stream[IO, E \/ A] = { | |
| fs2.Stream.eval(b).flatMap({ | |
| case \/-(r) => fs2.Stream.eval[IO, E \/ A](log(r.right[E]).map(_ => r.right[E])) | |
| case -\/(e) => delayed[IO, E \/ A](body, delay).flatMap( _ => Stream.eval[IO, E \/ A]( | |
| log(e.left[A]).map(_.left[A].leftMap(_ => e)) | |
| )).flatMap(_ => go(body)) | |
| }) | |
| } | |
| go(body) | |
| } | |
| } | |
| // Test | |
| import cats.effect.IO | |
| import org.scalacheck.{Gen, Prop} | |
| import org.specs2.specification.core.SpecStructure | |
| import org.specs2.{ScalaCheck, Specification} | |
| import scalaz.Equal | |
| import scalaz.std.anyVal._ | |
| import scalaz.syntax.either._ | |
| import scala.concurrent.duration._ | |
| // scalastyle:off magic.number | |
| @SuppressWarnings(Array("org.wartremover.warts.Throw", "org.wartremover.warts.StringPlusAny")) | |
| class Fs2StreamRetrySupportSpec extends Specification with ScalaCheck with Fs2StreamRetrySupport { | |
| def is: SpecStructure = | |
| s2""" | |
| Fs2StreamRetrySupport will retry forever until it evaluates to right side, otherwise continues forever $testSuccessCase | |
| """ | |
| object Failure extends RuntimeException | |
| private def testSuccessCase = | |
| Prop.forAllNoShrink(Gen.chooseNum[Int](10, 30))(n => { | |
| val atomicInteger = new AtomicInteger(0) | |
| val thread = | |
| new Thread { | |
| setDaemon(true) | |
| override def run(): Unit = { | |
| Thread.sleep(n.toLong) | |
| atomicInteger.set(1) | |
| } | |
| } | |
| thread.start() | |
| retryForEverUntilRight[Boolean, Boolean]( | |
| // Specs `===` conflicts with `===` of Equal | |
| if (Equal[Int].equal(atomicInteger.get(), 1)) | |
| IO(true.right[Boolean]) | |
| else | |
| IO(false.left[Boolean]), | |
| Some(4.millis) | |
| ).compile.drain.unsafeRunSync() === (()) | |
| }).set(minTestsOk = 2) | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment