-
-
Save yanana/fdd798cf05545f3131b7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import scalaz.stream.{async,Process} | |
| import scalaz.concurrent.Task | |
| /** | |
| * Try running the process `p`, retrying in the event of failure. | |
| * Example: `retry(Process.awakeEvery(2 minutes))(p)` will wait | |
| * 2 minutes after each failure before trying again, indefinitely. | |
| * Using `retry(Process.awakeEvery(2 minutes).take(5))(p)` will do | |
| * the same, but only retry a total of five times before raising | |
| * the latest error. | |
| */ | |
| def retry[A](retries: Process[Task,Any])(p: Process[Task,A]): | |
| Process[Task,A] = { | |
| val alive = async.signal[Unit]; alive.value.set(()) | |
| val step: Process[Task,Throwable \/ A] = | |
| p.append(Process.eval_(alive.close)).attempt() | |
| step.stripW ++ link(alive.continuous)(retries).terminated.flatMap { | |
| // on our last reconnect attempt, rethrow error | |
| case None => step.flatMap(_.fold(Process.fail, Process.emit)) | |
| // on other attempts, ignore the exceptions | |
| case Some(_) => step.stripW | |
| } | |
| } | |
| /** Terminate `p` when the given signal `alive` terminates. */ | |
| def link[A](alive: Process[Task,Unit])(p: Process[Task,A]): Process[Task,A] = | |
| alive.zip(p).map(_._2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment