Last active
November 9, 2015 20:54
-
-
Save matthughes/0860e7d0dd05937cc6f9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait RetryCombinators { | |
// Taken from Paul's gist: https://gist.github.com/pchiusano/7894696 | |
/** | |
* Try running the process `p`, retrying in the event of failure. | |
* Example: `retry(time.awakeEvery(2 minutes))(p)` will wait | |
* 2 minutes after each failure before trying again, indefinitely. | |
* Using `retry(Process.awakeEvery(2 minutes).take(5))(p)` will do | |
* the same, but only retry a total of five times before raising | |
* the latest error. | |
*/ | |
def retry[A](retries: Process[Task, Any])(p: Process[Task, A]): Process[Task, A] = { | |
val alive = async.signalOf[Unit](()) | |
val step: Process[Task, Throwable \/ A] = | |
p.append(Process.eval_(alive.close)).attempt() | |
step.stripW ++ link(alive.continuous)(retries).terminated.flatMap { | |
// on our last reconnect attempt, rethrow error | |
case None => step.flatMap(_.fold(Process.fail, Process.emit)) | |
// on other attempts, ignore the exceptions | |
case Some(_) => step.stripW | |
} | |
} | |
/** Terminate `p` when the given signal `alive` terminates. */ | |
def link[A](alive: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] = | |
alive.zip(p).map(_._2) | |
} | |
object test extends RetryCombinators { | |
import scala.concurrent.duration._ | |
// First let's look at Task | |
// We have a task that always fails | |
val alwaysFailsTask = Task[Int] { throw new Exception("Failed") } | |
// Probably more realistically is a Task that sometimes fails depending on some outside force (server being up, etc) | |
val sometimesFailsTask = Task[Int] { val randomNum = Math.random; if (randomNum < .9) 42 else throw new Exception("Better luck next time: " + randomNum) } | |
// Now if we run this, it's just chance as to whether we get a successful result | |
def result: Int = sometimesFailsTask.run | |
// Better than run is to use `attemptRun`; the above will throw an exception if we get unluck: | |
def betterResults: Throwable \/ Int = sometimesFailsTask.attemptRun | |
// But what if we want to retry the task until it succeeds. We can use Task.retry | |
// In this example, we retry once a second *forever* until we get a success | |
val finallyCompleted = sometimesFailsTask.retry(Stream.continually(1.second)) | |
// We can limit that a bit, by capping the number of retries to 5 | |
val finallyCompletedOrGaveUp = sometimesFailsTask.retry(Stream.continually(1.second).take(5)) | |
// Ok now let's see how this maps into Process | |
// Let's say we have a Process.range(1,100); we'll add a debug, so we can see values being emitted | |
val oneToHundred = Process.range(1, 100).toSource.observe(io.stdOutLines.contramap { x: Any => x.toString }) | |
// And we have a channel that sometimes fails; but when successful passes on its input | |
val sometimesFailsChannel = channel.lift { x: Int => sometimesFailsTask } | |
// If we run this, the process will pull values through the channel until it gets to its first failure and blow up | |
// Go ahead, try it for yourself | |
def probablyWillThrowChannel = (oneToHundred through sometimesFailsChannel).runLog.run | |
// What we want is some way to retry the channel until we get a success; let's try using `finallyCompleted` | |
val finallyCompletesChannel = channel.lift { x: Int => finallyCompleted } | |
// And we'll run that | |
def finallyCompletingChannelRun = (oneToHundred through finallyCompletesChannel).runLog.run | |
// So that worked, kind of. It repeating each step until it was successful. But it has the (at least sometimes) unwanted effect of not letting other | |
// traffic through. Each item is pull through the channel and that task is repeated until its successful. You might want some way for other items | |
// to get through while you're waiting for the failing items to retry | |
// | |
// Let's try using the `retry` combinator defined above; this works with Process, not Task. | |
// Here's one way we could define it (note we need an implicit scheduler in scope): | |
implicit val scheduler = scalaz.stream.DefaultScheduler | |
val retryProcessAttemptOne = retry(time.awakeEvery(1.second))(oneToHundred through sometimesFailsChannel) | |
/** | |
* scala> test.retryProcessAttemptOne.runLog.timed(10.seconds).run | |
* 1 | |
* 2 | |
* 3 | |
* 4 | |
* 5 | |
* 6 | |
* 1 | |
* 2 | |
* 3 | |
* 4 | |
* 5 | |
* 6 | |
* 7 | |
* 8 | |
* 9 | |
*/ | |
// Hmm that's not what I wanted. That retries the *entire* process, throwing away any successful transformations. | |
// Let's try just retrying the channel | |
val retryProcessAttemptTwo = oneToHundred through retry(time.awakeEvery(1.second))(sometimesFailsChannel) | |
/** | |
scala> test.retryProcessAttemptTwo.runLog.timed(10.seconds).run | |
1 | |
2 | |
3 | |
4 | |
5 | |
java.lang.Exception: Better luck next time: 0.9738160947217549 | |
at ... | |
scala> | |
*/ | |
// That did not do what I wanted; as soon as the channel returned a throwable, it blew up and stopped execution. | |
// Let's go back to Task for a minute. Let's explicitely introduce some parallelism. Let's say we have 100 tasks | |
// we want to process, some of which could fail. We want to retry failing tasks but also make progress on other ones | |
// while we wait | |
val hundredFaultyTasks: Task[List[Int]] = Task.gatherUnordered(1.to(100).toList.map { x => println(s"running $x"); finallyCompleted.map { _ => x } }) | |
/** | |
* scala> test.hundredFaultyTasks.timed(10.seconds).run | |
* res0: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 14, 20, 23, 24, 25, 26, 29, 30, 32, 33, 34, 35, 37, 38, 41, 42, 43, 49, 50, 51, 53, 55, 56, 57, 58, 59, 60, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 78, 79, 80, 81, 82, 83, 85, 86, 87, 89, 90, 91, 92, 93, 94, 95, 97, 99, 100, 88, 39, 40, 44, 46, 47, 22, 31, 15, 10, 17, 18, 19, 21, 36, 48, 61, 62, 64, 65, 77, 96, 16, 13, 28, 45, 52, 54, 66, 63, 27, 84, 98) | |
*/ | |
// Ok so that's good. At least when I have a 100 tasks, I was able to get some parallelism and retries. Note the above is unordered, | |
// but we get it ordered as well | |
val hundredFaultyTasksOrdered: Task[List[Int]] = Nondeterminism[Task].gather(1.to(100).toList.map { x => println(s"running $x"); finallyCompleted.map { _ => x } }) | |
// Maybe that's the trick with Process? We need to explictely tell it to let other values through while waiting. | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment