Last active
December 30, 2015 22:29
-
-
Save pchiusano/7894696 to your computer and use it in GitHub Desktop.
Combinator for retrying a `Process` multiple times, delaying between attempts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// FYI: some comments below refer to old version of this gist: https://gist.github.com/pchiusano/7894696/12201b92db57dff8ed6689fc55c15c3f1a136f86 | |
package scalaz.stream | |
import scalaz.\/ | |
import scalaz.concurrent.Task | |
object retries { | |
def dropWhileUnlessAtEnd[I](f: I => Boolean): Process1[I,I] = { | |
def go(prev: Option[I]): Process1[I,I] = | |
process1.awaitOption[I].flatMap { | |
// `prev` was the last element, emit it unconditionally | |
case None => Process.emitAll(prev.toList) | |
// `prev` wasn't the last element, emit it only if it tests false vs predicate | |
case some => Process.emitAll(prev.toList.dropWhile(f)) ++ go(some) | |
} | |
go(None) | |
} | |
def retry[A](schedule: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] = { | |
// step will have either left(err) (if failed) or right(None) (if succeeded) as its last element | |
val step: Process[Task, Throwable \/ Option[A]] = p.terminated.attempt() | |
val retries: Process[Task, Throwable \/ Option[A]] = schedule.flatMap(_ => step) | |
// if we get a `None` on the right, _.isDefined will be false, and we've had a successful attempt, so stop | |
retries.takeWhile(_.fold(_ => true, _.isDefined)) | |
// all but last error is ignored | |
.pipe(dropWhileUnlessAtEnd(_.isLeft)) | |
.flatMap(_.fold(Process.fail, o => Process.emitAll(o.toList))) | |
} | |
} |
Okay, I think this might do it:
def dropWhileUnlessAtEnd[I](f: I => Boolean): Process1[I,I] = {
def go(prev: Option[I]): Process1[I,I] =
process1.awaitOption[I].flatMap {
// `prev` was the last element, emit it unconditionally
case None => Process.emitAll(prev.toList)
// `prev` wasn't the last element, emit it only if it tests false vs predicate
case some => Process.emitAll(prev.toList.dropWhile(f)) ++ go(some)
}
go(None)
}
def retry[A](schedule: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] = {
val step: Process[Task, Throwable \/ Option[A]] = p.terminated.attempt()
val retries: Process[Task, Throwable \/ Option[A]] = schedule.flatMap(_ => step)
retries.takeWhile(_.fold(_ => true, _.isDefined))
.pipe(dropWhileUnlessAtEnd(_.isLeft))
.flatMap(_.fold(Process.fail, o => Process.emitAll(o.toList)))
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Paul, it seems that in the successful case the process is repeated.
The
None
case happens either when 1) retries are exhausted or 2) the signal is exhausted. Signal gets exhausted when you are successful. But in theNone
case you repeatstep
. So there is the firststep
++step
in None. Ifstep
represents a web service call, you're always making two even in the case of successful first try.This is demonstrated in this gist (https://gist.github.com/matthughes/273a5ddf813ca382b697). Although, it seems to depend on how you construct the Process as one kind of process repeats and another mysteriously doesn't.