Skip to content

Instantly share code, notes, and snippets.

@yanana
Forked from pchiusano/retries.scala
Last active August 29, 2015 14:23
Show Gist options
  • Select an option

  • Save yanana/fdd798cf05545f3131b7 to your computer and use it in GitHub Desktop.

Select an option

Save yanana/fdd798cf05545f3131b7 to your computer and use it in GitHub Desktop.
import scalaz.stream.{async,Process}
import scalaz.concurrent.Task
/**
* Try running the process `p`, retrying in the event of failure.
* Example: `retry(Process.awakeEvery(2 minutes))(p)` will wait
* 2 minutes after each failure before trying again, indefinitely.
* Using `retry(Process.awakeEvery(2 minutes).take(5))(p)` will do
* the same, but only retry a total of five times before raising
* the latest error.
*/
def retry[A](retries: Process[Task,Any])(p: Process[Task,A]):
Process[Task,A] = {
val alive = async.signal[Unit]; alive.value.set(())
val step: Process[Task,Throwable \/ A] =
p.append(Process.eval_(alive.close)).attempt()
step.stripW ++ link(alive.continuous)(retries).terminated.flatMap {
// on our last reconnect attempt, rethrow error
case None => step.flatMap(_.fold(Process.fail, Process.emit))
// on other attempts, ignore the exceptions
case Some(_) => step.stripW
}
}
/** Terminate `p` when the given signal `alive` terminates. */
def link[A](alive: Process[Task,Unit])(p: Process[Task,A]): Process[Task,A] =
alive.zip(p).map(_._2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment