Last active
December 7, 2018 05:40
-
-
Save afsalthaj/4a7f07a48b41eb3e7942507c80b5bc77 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import scalaz.concurrent.Task | |
| import scalaz.effect.{IO, LiftIO} | |
| implicit val taskIO: LiftIO[Task] = new LiftIO[Task] { | |
| override def liftIO[A](ioa: IO[A]): Task[A] = | |
| Task.delay(ioa.unsafePerformIO()) | |
| } | |
| import au.com.iag.dataeng.retries.instances.TaskInstances | |
| import scalaz.{-\/, Maybe, \/-} | |
| import scalaz.concurrent.Task | |
| import scalaz.effect.IO.newIORef | |
| import scalaz.effect.LiftIO | |
| import scala.concurrent.duration.Duration | |
| trait RetrySupport extends TaskInstances { | |
| def retryThrowableUntil[A]( | |
| body: Task[Throwable \/ A], | |
| delay: Duration, | |
| maxTry: Int, | |
| logRetries: Maybe[Int => Task[Unit]] = Maybe.empty | |
| ): Task[Throwable \/ A] = | |
| for { | |
| count <- LiftIO[Task].liftIO(newIORef(0)) | |
| a <- { | |
| def retry(body: => Task[Throwable \/ A]): Task[Throwable \/ A] = | |
| for { | |
| r <- body.flatMap[Throwable \/ A]({ | |
| case -\/(r) => | |
| for { | |
| n <- LiftIO[Task].liftIO(count.mod(_ + 1)) | |
| _ = println(s"retrying $n") | |
| _ <- logRetries.cata(_(n), Task.delay(())) | |
| res <- if (n < maxTry) retry(body).after(delay) else Task.delay(r.left) | |
| } yield res | |
| case \/-(r) => Task.delay(r.right) | |
| }) | |
| } yield r | |
| retry(body) | |
| } | |
| } yield a | |
| def retryUntil[A]( | |
| body: => A, | |
| delay: Duration, | |
| maxTry: Int, | |
| logRetries: Maybe[Int => Task[Unit]] = Maybe.empty | |
| ): Task[Throwable \/ A] = | |
| retryThrowableUntil(Task.delay(body).attempt, delay, maxTry, logRetries) | |
| } | |
| // A primitive example that uses our retryUntil. Copy paste the whole thing and run! | |
| import java.util.concurrent.atomic.AtomicInteger | |
| val s = new AtomicInteger() | |
| val thread = | |
| new Thread { | |
| override def run(): Unit = { | |
| Thread.sleep(4000) | |
| println("Setting the variable") | |
| s.set(1) | |
| } | |
| } | |
| thread.start() | |
| retryUntil(if (s.get() === 1) true else throw new Exception, 1.seconds, 8).unsafePerformSync |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment