Skip to content

Instantly share code, notes, and snippets.

@shajra
Created February 26, 2015 18:28
Show Gist options
  • Save shajra/8573a0df4317e3829e09 to your computer and use it in GitHub Desktop.
Save shajra/8573a0df4317e3829e09 to your computer and use it in GitHub Desktop.
some snippets about retry abstractions in Scala
package shajra.http.client
import scala.concurrent.duration.Duration
import scalaz.{Functor, NonEmptyList}
private[client] sealed abstract class HttpCommand[C, E, O] {
def visit[OO]
(ifCall: (CallDesc[C], Unmarshal[E, O]) => OO,
ifFork: ForkDesc[C, E, O] => OO,
ifRetry: Retry[C, E, O] => OO)
: OO =
this match {
case Call(ct, ep, m, req, resType, res) =>
ifCall(CallDesc(ct, ep, m, req, resType), res)
case Fork(forkImpl) =>
ifFork(forkImpl)
case r@Retry(_, _) =>
ifRetry(r)
}
def map[OO](f: O => OO): HttpCommand[C, E, OO]
}
private case class Call[C, E, O]
(callType: C,
endpoint: Endpoint,
method: Method,
request: Media,
responseType: MediaType,
response: Unmarshal[E, O])
extends HttpCommand[C, E, O] {
def map[OO](f: O => OO) = copy(response = response andThen { _ map f })
}
private case class Fork[C, E, O](impl: ForkDesc[C, E, O])
extends HttpCommand[C, E, O] {
def map[OO](g: O => OO) =
HttpCommand.fork(impl.call1, impl.call2) { (a, b) => g(impl.f(a, b)) }
}
private case class Retry[C, E, O]
(plan: HttpPlan[C, E, O], desc: RetryDesc[E])
extends HttpCommand[C, E, O] {
def map[OO](f: O => OO) = Retry(plan map f, desc)
}
private sealed abstract class RetryDesc[E] {
type S
def firstState: S
def test(s: S, es: NonEmptyList[E]): Option[(S, Option[Duration])]
}
private sealed abstract class ForkDesc[C, E, O] { self =>
type A
type B
def call1: HttpPlan[C, E, A]
def call2: HttpPlan[C, E, B]
def f(a: A, b: B): O
}
private object HttpCommand {
def call[C, E, O]
(callType: C,
endpoint: Endpoint,
method: Method,
request: Media,
responseType: MediaType,
responseMarshal: Unmarshal[E, O])
: HttpCommand[C, E, O] =
Call(callType, endpoint,method, request, responseType, responseMarshal)
def fork[C, E, A, B, O]
(call1: HttpPlan[C, E, A],
call2: HttpPlan[C, E, B])
(f: (A, B) => O = Function.untupled(identity[(A, B)] _))
: HttpCommand[C, E, O] = {
type AA = A
type BB = B
val c1 = call1
val c2 = call2
val ff = f
Fork(new ForkDesc[C, E, O] {
type A = AA
type B = BB
val call1 = c1
val call2 = c2
def f(a: A, b: B) = ff(a, b)
})
}
def retry[S, C, E, O]
(plan: HttpPlan[C, E, O],
firstState: S)
(test: (S, NonEmptyList[E]) => Option[(S, Option[Duration])])
: HttpCommand[C, E, O] = {
type SS = S
val firstState_ = firstState
val test_ = test
Retry(plan,
new RetryDesc[E] {
type S = SS
val firstState = firstState_
def test(s: S, es: NonEmptyList[E]) = test_(s, es)
})
}
implicit def functor[C, E]
: Functor[({type F[o] = HttpCommand[C, E, o]})#F] = {
type F[o] = HttpCommand[C, E, o]
new Functor[F] { def map[A, B](plan: F[A])(f: A => B) = plan map f }
}
}
package shajra.http.client
import scala.concurrent.ExecutionContextExecutorService
import scala.concurrent.duration.Duration
import scalaz.{EitherT, Free, Nondeterminism, Monad, ValidationNel,
NonEmptyList}
import scalaz.Free.{Return, Suspend}
import scalaz.concurrent.{Task, Strategy}
import scalaz.syntax.monad._
import scalaz.syntax.std.option._
import scalaz.std.option._
import shajra.extn.scala.concurrent.Strategy.fromService
final class HttpPlan[C, E, O] private
(private val impl: Free[({type F[o] = HttpCommand[C, E, o]})#F, O]) {
def map[OO](f: O => OO): HttpPlan[C, E, OO] =
new HttpPlan(Monad[M].map(this.impl)(f))
def flatMap[OO](f: O => HttpPlan[C, E, OO]): HttpPlan[C, E, OO] =
new HttpPlan(Monad[M].bind(this.impl)(f andThen { _.impl }))
def retry(times: Int): HttpPlan[C, E, O] = HttpPlan.retry(this, times)
def executeDefaultPool(strategy: CallStrategy[C, E])
(implicit pool: ExecutionContextExecutorService =
fromService(Strategy.DefaultExecutorService))
: EitherT[Task, NonEmptyList[E], O] =
execute(strategy)(pool)
def execute(strategy: CallStrategy[C, E])
(pool: ExecutionContextExecutorService =
fromService(Strategy.DefaultExecutorService))
: EitherT[Task, NonEmptyList[E], O] = {
def loop[OO](m: M[OO]): EitherT[Task, NonEmptyList[E], OO] =
m.resume.fold(_.visit(ifCall, ifFork, ifRetry), EitherT right Task.now(_))
def ifCall[OO](call: CallDesc[C], res: Unmarshal[E, M[OO]])
: EitherT[Task, NonEmptyList[E], OO] =
strategy(call, pool) >>=
(res andThen { v => EitherT(Task now v.disjunction) }) >>=
loop
def ifFork[OO](fork: ForkDesc[C, E, M[OO]])
: EitherT[Task, NonEmptyList[E], OO] =
EitherT(
Nondeterminism[Task]
.mapBoth(execFork(fork.call1), execFork(fork.call2)) { (a, b) =>
(a |@| b)(fork.f).disjunction
}
) >>= loop
def ifRetry[OO](retry: Retry[C, E, M[OO]])
: EitherT[Task, NonEmptyList[E], OO] = {
val runFuture = loop(retry.plan.impl).swap.run.get
def loopRetry(state: (retry.desc.S, Option[Duration]))
: EitherT[Task, M[OO], NonEmptyList[E]] = {
val delayedFuture = state._2 some runFuture.after none runFuture
val run = EitherT(new Task(delayedFuture))
run >>= { es => retry.desc test (state._1, es) some loopRetry none run }
}
loopRetry((retry.desc.firstState, none)).swap >>= loop
}
def execFork[OO](plan: HttpPlan[C, E, OO]): Task[ValidationNel[E, OO]] =
EitherT(Task.fork(loop(plan.impl).run)(pool)).validation
loop(impl)
}
private type F[o] = HttpCommand[C, E, o]
private type M[o] = Free[F, o]
}
object HttpPlan {
def bodylessCall[C, E, O]
(callType: C,
endpoint: Endpoint,
method: Method,
responseType: MediaType)
(responseMarshal: DetailedUnmarshal[C, E, O])
: HttpPlan[C, E, O] =
call(callType, endpoint, method, responseType)
{ _:Unit => Media.empty } { responseMarshal } (())
def call[C, E, I, O]
(callType: C,
endpoint: Endpoint,
method: Method,
responseType: MediaType)
(requestMarshal: I => Media)
(responseMarshal: DetailedUnmarshal[C, E, O])
: I => HttpPlan[C, E, O] = { i =>
val cd =
CallDesc(callType, endpoint, method, requestMarshal(i), responseType)
val m = responseMarshal.unmarshal(cd)
HttpPlan(
HttpCommand.call(
callType, endpoint,method, requestMarshal(i), responseType, m))
}
def fork[C, E, A, B, O]
(plan1: HttpPlan[C, E, A],
plan2: HttpPlan[C, E, B])
(f: (A, B) => O)
: HttpPlan[C, E, O] =
HttpPlan(HttpCommand.fork(plan1, plan2)(f))
def retry[C, E, O](plan: HttpPlan[C, E, O], times: Int): HttpPlan[C, E, O] =
HttpPlan(
HttpCommand.retry(plan, times) { (state: Int, errors) =>
(state - 1).some filter { _ > 0 } map { (_, none) }
})
def point[C, E, A](a: A): HttpPlan[C, E, A] = {
type M[a] = HttpPlan[C, E, a]
Monad[M].point(a)
}
implicit def monad[C, E]
: Monad[({type M[o] = HttpPlan[C, E, o]})#M] = {
type F[o] = HttpCommand[C, E, o]
type M[o] = HttpPlan[C, E, o]
new Monad[M] {
def bind[A, B](m: M[A])(f: A => M[B]): M[B] = m flatMap f
def point[A](a: => A): M[A] = new HttpPlan(Return[F, A](a))
}
}
private def apply[C, E, O](p: HttpCommand[C, E, O]): HttpPlan[C, E, O] = {
type F[o] = HttpCommand[C, E, o]
new HttpPlan(Suspend[F, O](p map { Return(_) }))
}
}
package shajra.extn.scala
package concurrent
import java.util.Collections
import java.util.concurrent.{
AbstractExecutorService => AES, ExecutorService => ES, TimeUnit}
import scala.concurrent.{
ExecutionContext => EC, ExecutionContextExecutorService => ECES}
object Strategy {
def fromService(es: ES): ECES = EC.fromExecutorService(es)
def fromContext(ec: EC): ECES =
new AES with ECES {
override def prepare(): EC = ec
override def isShutdown = false
override def isTerminated = false
override def shutdown() = ()
override def shutdownNow() = Collections.emptyList[Runnable]
override def execute(runnable: Runnable): Unit = ec execute runnable
override def reportFailure(t: Throwable): Unit = ec reportFailure t
override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment