Created
December 11, 2015 22:21
-
-
Save matthughes/273a5ddf813ca382b697 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object PaulRetry { | |
import scalaz.stream.{ async, Process } | |
import scalaz.concurrent.Task | |
import scala.concurrent.duration._ | |
implicit val scheduler = scalaz.stream.DefaultScheduler | |
/** | |
* Try running the process `p`, retrying in the event of failure. | |
* Example: `retry(Process.awakeEvery(2 minutes))(p)` will wait | |
* 2 minutes after each failure before trying again, indefinitely. | |
* Using `retry(Process.awakeEvery(2 minutes).take(5))(p)` will do | |
* the same, but only retry a total of five times before raising | |
* the latest error. | |
*/ | |
def retry[A](retries: Process[Task, Any])(p: Process[Task, A]): Process[Task, A] = { | |
val alive = async.signalOf[Unit](()) | |
val step: Process[Task, Throwable \/ A] = | |
p.append(Process.eval_(alive.close)).attempt() | |
step.stripW ++ link(alive.continuous)(retries).terminated.flatMap { | |
// on our last reconnect attempt, rethrow error | |
case None => step.flatMap(_.fold(Process.fail, Process.emit)) | |
// on other attempts, ignore the exceptions | |
case Some(_) => step.stripW | |
} | |
} | |
/** Terminate `p` when the given signal `alive` terminates. */ | |
def link[A](alive: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] = | |
alive.zip(p).map(_._2) | |
def runProducesDoubleOutput(): Unit = { | |
retry(time.awakeEvery(1.second).take(5))(Process.eval(Task { | |
println("Running 1") | |
1 | |
})).run.run | |
} | |
// scala> com.ccadllc.ipdc.terminaltopology.PaulRetry.runProducesDoubleOutput | |
// Running 1 | |
// Running 1 | |
def runProducesSingleOutput(): Unit = { | |
retry(time.awakeEvery(1.second).take(5))(Process(1).map { x => | |
println(s"Running $x") | |
x | |
}).run.run | |
} | |
// scala> com.ccadllc.ipdc.terminaltopology.PaulRetry.runProducesSingleOutput | |
// Running 1 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment