Skip to content

Instantly share code, notes, and snippets.

@afsalthaj
Last active December 16, 2018 09:57
Show Gist options
  • Select an option

  • Save afsalthaj/0be023a60985ca22f29aee6f6cc6be52 to your computer and use it in GitHub Desktop.

Select an option

Save afsalthaj/0be023a60985ca22f29aee6f6cc6be52 to your computer and use it in GitHub Desktop.
/**
* Retrying using Fs2, where if the result is `right` of disjunction stream stops, else retries for ever
* with optional `FinitDuration` of delay between each retry, with logging as `Left \/ Right => IO[Unit]`.
*/
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments", "org.wartremover.warts.Recursion"))
trait Fs2StreamRetrySupport {
def delayed[F[_], A](b: => F[A], delay: Option[FiniteDuration])(implicit F: Effect[F]): Stream[F, A] = {
delay.fold(
Stream.eval[F, A](b)
)(d => Stream.eval[F, A] {
F.liftIO {
cats.effect.IO.sleep(d)
}.flatMap(_ => b)
})
}
// A retryForEver is best seen as fs2 Stream, and hence it is in cats.IO
def retryForEverUntilRight[E, A](
body: => IO[E \/ A],
delay: Option[FiniteDuration],
log: (E \/ A) => IO[Unit] = (_: E \/ A) => IO[Unit](())
): fs2.Stream[IO, E \/ A] = {
def go (b: IO[E \/ A]): fs2.Stream[IO, E \/ A] = {
fs2.Stream.eval(b).flatMap({
case \/-(r) => fs2.Stream.eval[IO, E \/ A](log(r.right[E]).map(_ => r.right[E]))
case -\/(e) => delayed[IO, E \/ A](body, delay).flatMap( _ => Stream.eval[IO, E \/ A](
log(e.left[A]).map(_.left[A].leftMap(_ => e))
)).flatMap(_ => go(body))
})
}
go(body)
}
}
// Test
import cats.effect.IO
import org.scalacheck.{Gen, Prop}
import org.specs2.specification.core.SpecStructure
import org.specs2.{ScalaCheck, Specification}
import scalaz.Equal
import scalaz.std.anyVal._
import scalaz.syntax.either._
import scala.concurrent.duration._
// scalastyle:off magic.number
@SuppressWarnings(Array("org.wartremover.warts.Throw", "org.wartremover.warts.StringPlusAny"))
class Fs2StreamRetrySupportSpec extends Specification with ScalaCheck with Fs2StreamRetrySupport {
def is: SpecStructure =
s2"""
Fs2StreamRetrySupport will retry forever until it evaluates to right side, otherwise continues forever $testSuccessCase
"""
object Failure extends RuntimeException
private def testSuccessCase =
Prop.forAllNoShrink(Gen.chooseNum[Int](10, 30))(n => {
val atomicInteger = new AtomicInteger(0)
val thread =
new Thread {
setDaemon(true)
override def run(): Unit = {
Thread.sleep(n.toLong)
atomicInteger.set(1)
}
}
thread.start()
retryForEverUntilRight[Boolean, Boolean](
// Specs `===` conflicts with `===` of Equal
if (Equal[Int].equal(atomicInteger.get(), 1))
IO(true.right[Boolean])
else
IO(false.left[Boolean]),
Some(4.millis)
).compile.drain.unsafeRunSync() === (())
}).set(minTestsOk = 2)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment