Skip to content

Instantly share code, notes, and snippets.

@mpilquist
Created November 18, 2016 02:24
Show Gist options
  • Save mpilquist/21430db54b22c17557a63a15d7b4be06 to your computer and use it in GitHub Desktop.
Save mpilquist/21430db54b22c17557a63a15d7b4be06 to your computer and use it in GitHub Desktop.
import java.time.Instant
import scala.concurrent.duration._
import fs2._
object Retries {
/**
* Returns a stream that evaluates the specified task once and then for each unit that appears in the `changes` stream. If any task evaluation
* fails with an exception, the task is retried according to the specified `retryDelay` schedule until it succeeds or a value from the `changes`
* stream appears.
*
* @param task task to evaluate
* @param changes stream that signals the task should be evaluated again (or if currently retrying, the retry delay should be reset)
* @param retryDelay function which determines how long to wait before retrying the task after the specified evaluation attempt number
*/
def retryTaskOnFailureAndChange[A](
task: Task[A],
changes: Stream[Task, Unit],
retryDelay: Int => FiniteDuration
)(implicit strategy: Strategy, scheduler: Scheduler): Stream[Task, Either[Throwable, A]] = {
val firstAndChanges: Stream[Task, Task[A]] = (Stream.emit(()) ++ changes).map(_ => task)
retryTasksOnFailure(firstAndChanges, retryDelay)
}
/**
* Returns a stream that evaluates the specified stream of tasks. Each task is evaluated at least once. If evaluation of a task fails, and
* the next task is not yet available from the `tasks` stream, then the failed task is retried according to the retry schedule specified by
* the `retryDelay` function.
*
* @param tasks tasks to evaluate
* @param retryDelay function which determines how long to wait before retrying the task after the specified evaluation attempt number
*/
def retryTasksOnFailure[A](
tasks: Stream[Task, Task[A]],
retryDelay: Int => FiniteDuration
)(implicit strategy: Strategy, scheduler: Scheduler): Stream[Task, Either[Throwable, A]] = {
Stream.eval(async.synchronousQueue[Task, Unit]).flatMap { clockTicksQueue =>
(clockTicksQueue.dequeue either tasks.through(signalize)).through(retryOnFailureAndChangePipe(retryDelay, clockTicksQueue.enqueue1(())))
}
}
private def retryOnFailureAndChangePipe[A](
retryDelay: Int => FiniteDuration,
signalTick: Task[Unit]
)(implicit strategy: Strategy, scheduler: Scheduler): Pipe[Task, Either[Unit, Task[A]], Either[Throwable, A]] = {
def waitingForTask: Handle[Task, Either[Unit, Task[A]]] => Pull[Task, Either[Throwable, A], Unit] = {
_.receive1 {
case (Right(task), h) =>
attempt(task.attempt, 0)(h)
case (Left(tick), h) =>
waitingForTask(h)
}
}
def attempt(task: Task[Either[Throwable, A]], attempts: Int): Handle[Task, Either[Unit, Task[A]]] => Pull[Task, Either[Throwable, A], Unit] = h => {
Pull.eval(task).flatMap {
case e @ Right(a) => Pull.output1(e) >> waitingForTask(h)
case e @ Left(t) =>
Pull.output1(e) >> {
val attempt = attempts + 1
val delayBeforeNextAttempt = retryDelay(attempt)
for {
nextAttempt <- Pull.eval(Task.delay(Instant.now.toEpochMilli + delayBeforeNextAttempt.toMillis))
_ <- Pull.eval(Task.start(signalTick.schedule(delayBeforeNextAttempt)))
result <- retrying(task, attempts + 1, nextAttempt)(h)
} yield result
}
}
}
def retrying(task: Task[Either[Throwable, A]], attempts: Int, nextAttemptMillis: Long): Handle[Task, Either[Unit, Task[A]]] => Pull[Task, Either[Throwable, A], Unit] = h => {
h.receive1 {
case (Right(task), h) =>
waitingForTask(h.push(Chunk.singleton(Right(task))))
case (Left(tick), h) =>
Pull.eval(Task.delay(Instant.now)).flatMap { now =>
if (now.toEpochMilli < nextAttemptMillis) retrying(task, attempts, nextAttemptMillis)(h)
else attempt(task, attempts)(h)
}
}
}
_.pull(waitingForTask)
}
/** Returns a stream that outputs the latest available `A` from the input. */
def signalize[F[_]: util.Async, A]: Pipe[F, A, A] = in => {
Stream.eval(async.signalOf[F, Option[A]](None)).flatMap { signal =>
in.evalMap(a => signal.set(Some(a))).drain merge signal.discrete.collect { case Some(a) => a }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment