Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Last active December 30, 2015 22:29
Show Gist options
  • Save pchiusano/7894696 to your computer and use it in GitHub Desktop.
Save pchiusano/7894696 to your computer and use it in GitHub Desktop.
Combinator for retrying a `Process` multiple times, delaying between attempts
// FYI: some comments below refer to old version of this gist: https://gist.github.com/pchiusano/7894696/12201b92db57dff8ed6689fc55c15c3f1a136f86
package scalaz.stream
import scalaz.\/
import scalaz.concurrent.Task
object retries {
def dropWhileUnlessAtEnd[I](f: I => Boolean): Process1[I,I] = {
def go(prev: Option[I]): Process1[I,I] =
process1.awaitOption[I].flatMap {
// `prev` was the last element, emit it unconditionally
case None => Process.emitAll(prev.toList)
// `prev` wasn't the last element, emit it only if it tests false vs predicate
case some => Process.emitAll(prev.toList.dropWhile(f)) ++ go(some)
}
go(None)
}
def retry[A](schedule: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] = {
// step will have either left(err) (if failed) or right(None) (if succeeded) as its last element
val step: Process[Task, Throwable \/ Option[A]] = p.terminated.attempt()
val retries: Process[Task, Throwable \/ Option[A]] = schedule.flatMap(_ => step)
// if we get a `None` on the right, _.isDefined will be false, and we've had a successful attempt, so stop
retries.takeWhile(_.fold(_ => true, _.isDefined))
// all but last error is ignored
.pipe(dropWhileUnlessAtEnd(_.isLeft))
.flatMap(_.fold(Process.fail, o => Process.emitAll(o.toList)))
}
}
@pchiusano
Copy link
Author

Okay, I think this might do it:

  def dropWhileUnlessAtEnd[I](f: I => Boolean): Process1[I,I] = {
    def go(prev: Option[I]): Process1[I,I] =
      process1.awaitOption[I].flatMap {
        // `prev` was the last element, emit it unconditionally
        case None => Process.emitAll(prev.toList)
        // `prev` wasn't the last element, emit it only if it tests false vs predicate
        case some => Process.emitAll(prev.toList.dropWhile(f)) ++ go(some)
      }
    go(None)
  }

  def retry[A](schedule: Process[Task, Unit])(p: Process[Task, A]): Process[Task, A] = {
    val step: Process[Task, Throwable \/ Option[A]] = p.terminated.attempt()
    val retries: Process[Task, Throwable \/ Option[A]] = schedule.flatMap(_ => step)
    retries.takeWhile(_.fold(_ => true, _.isDefined))
           .pipe(dropWhileUnlessAtEnd(_.isLeft))
           .flatMap(_.fold(Process.fail, o => Process.emitAll(o.toList)))
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment