Created
December 8, 2022 15:43
-
-
Save moust/b996ce404fc80ed81c7b70b952adc86d to your computer and use it in GitHub Desktop.
Retries an operation until success or reaches given timeout.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeoutException | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import cats.{Applicative, Show} | |
import cats.effect.{Concurrent, ContextShift, IO, Timer} | |
import cats.syntax.applicativeError._ | |
import cats.syntax.flatMap._ | |
object Test { | |
def wait[F[_]: Applicative: Concurrent: Timer, A]( | |
fa: F[A], | |
timeout: FiniteDuration, | |
): F[A] = { | |
def loop: F[A] => F[A] = (fa: F[A]) => fa.recoverWith { case _ => loop(fa) } | |
Concurrent[F] | |
.race( | |
loop(fa), | |
Timer[F].sleep(timeout), | |
) | |
.flatMap { | |
case Left(a) => Concurrent[F].pure(a) | |
case Right(_) => Concurrent[F].raiseError(new TimeoutException()) | |
} | |
} | |
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) | |
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global) | |
def recover: PartialFunction[Throwable, String] = { | |
case _: TimeoutException => "Timeout" | |
case err => err.getMessage | |
} | |
implicit case class DebutOps[A: Show](ioa: IO[A]) extends AnyVal { | |
def debug: IO[A] = ioa.flatTap(a => IO.delay(println(Show[A].show(a)))) | |
} | |
def main(args: Array[String]): Unit = { | |
// simple success | |
wait(IO.pure("Success"), 1.seconds) | |
.debug | |
.unsafeRunSync() | |
// simple success (from Future) | |
wait(IO.fromFuture(IO(Future.successful("Success"))), 1.seconds) | |
.debug | |
.unsafeRunSync() | |
// timeout | |
wait(timer.sleep(1.seconds) >> IO.pure("Success"), 500.millis) | |
.recover(recover) | |
.debug | |
.unsafeRunSync() | |
// many retry until timeout | |
wait[IO, String]( | |
IO.delay(println("call")) >> timer.sleep(200.millis) >> IO.raiseError(new Exception("Failure")), | |
1.seconds, | |
) | |
.recover(recover) | |
.debug | |
.unsafeRunSync() | |
() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment