-
-
Save pchiusano/7894696 to your computer and use it in GitHub Desktop.
// FYI: some comments below refer to old version of this gist: https://gist.github.com/pchiusano/7894696/12201b92db57dff8ed6689fc55c15c3f1a136f86 | |
package scalaz.stream | |
import scalaz.\/ | |
import scalaz.concurrent.Task | |
object retries { | |
def dropWhileUnlessAtEnd[I](f: I => Boolean): Process1[I,I] = { | |
def go(prev: Option[I]): Process1[I,I] = | |
process1.awaitOption[I].flatMap { | |
// `prev` was the last element, emit it unconditionally | |
case None => Process.emitAll(prev.toList) | |
// `prev` wasn't the last element, emit it only if it tests false vs predicate | |
case some => Process.emitAll(prev.toList.dropWhile(f)) ++ go(some) | |
} | |
go(None) | |
} | |
def retry[A](schedule: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] = { | |
// step will have either left(err) (if failed) or right(None) (if succeeded) as its last element | |
val step: Process[Task, Throwable \/ Option[A]] = p.terminated.attempt() | |
val retries: Process[Task, Throwable \/ Option[A]] = schedule.flatMap(_ => step) | |
// if we get a `None` on the right, _.isDefined will be false, and we've had a successful attempt, so stop | |
retries.takeWhile(_.fold(_ => true, _.isDefined)) | |
// all but last error is ignored | |
.pipe(dropWhileUnlessAtEnd(_.isLeft)) | |
.flatMap(_.fold(Process.fail, o => Process.emitAll(o.toList))) | |
} | |
} |
Hi Paul! Here's what I came up with for this:
def retry[A](schedule: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] =
val step: Process[Task, Throwable \/ A] = p.attempt()
val retries = schedule.zip(step.repeat).map(_._2)
(retries.dropWhile(_.isLeft) ++ step).take(1).flatMap(_.fold(Process.fail, Process.emit))
My thinking was "if you want to repeat steps according to the schedule, why don't you just say so?" 😀 So that's retries
. Then we want to ignore failures. That's trivially .dropWhile(_.isLeft)
. But they could all fail and we'd have an empty stream, so we need ++ step
that won't be dropped. So (retries.dropWhile(_.isLeft) ++ step)
says exactly what we mean: step
according to the schedule
, ignoring failures, then step
again. Finally, .take(1)
takes what that stream emits: either the first successful step
from retries
, or the last step
, successful or not.
So the happy path is: the first step
in retries
succeeds; .dropWhile(_.isLeft)
drops nothing; .take(1)
takes that first, successful step
; Process.emit()
emits the A
. Or look at it this way: retries
is the fuse. .dropWhile(_.isLeft)
burns it down. ++ step
is the blasting cap. (Yes, I had a misspent youth with explosives....)
One possibly subtle implication: schedule
governs everything, including the first step
. So if you don't want a delay before the first step
, don't put one in the schedule
, i.e. time.awakeEvery(d).map(_ => ()).take(n)
is the wrong idiom. (Process.emit(()) ++ time.sleep(d)).repeat.take(n)
is the right idiom.
Thanks for provoking my thought on this!
Paul, it seems that in the successful case the process is repeated.
The None
case happens either when 1) retries are exhausted or 2) the signal is exhausted. Signal gets exhausted when you are successful. But in the None
case you repeat step
. So there is the first step
++ step
in None. If step
represents a web service call, you're always making two even in the case of successful first try.
This is demonstrated in this gist (https://gist.github.com/matthughes/273a5ddf813ca382b697). Although, it seems to depend on how you construct the Process as one kind of process repeats and another mysteriously doesn't.
Okay, I think this might do it:
def dropWhileUnlessAtEnd[I](f: I => Boolean): Process1[I,I] = {
def go(prev: Option[I]): Process1[I,I] =
process1.awaitOption[I].flatMap {
// `prev` was the last element, emit it unconditionally
case None => Process.emitAll(prev.toList)
// `prev` wasn't the last element, emit it only if it tests false vs predicate
case some => Process.emitAll(prev.toList.dropWhile(f)) ++ go(some)
}
go(None)
}
def retry[A](schedule: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] = {
val step: Process[Task, Throwable \/ Option[A]] = p.terminated.attempt()
val retries: Process[Task, Throwable \/ Option[A]] = schedule.flatMap(_ => step)
retries.takeWhile(_.fold(_ => true, _.isDefined))
.pipe(dropWhileUnlessAtEnd(_.isLeft))
.flatMap(_.fold(Process.fail, o => Process.emitAll(o.toList)))
}
Update: comments below refer to older version of this gist