Created
February 26, 2015 18:28
-
-
Save shajra/8573a0df4317e3829e09 to your computer and use it in GitHub Desktop.
some snippets about retry abstractions in Scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package shajra.http.client | |
import scala.concurrent.duration.Duration | |
import scalaz.{Functor, NonEmptyList} | |
private[client] sealed abstract class HttpCommand[C, E, O] { | |
def visit[OO] | |
(ifCall: (CallDesc[C], Unmarshal[E, O]) => OO, | |
ifFork: ForkDesc[C, E, O] => OO, | |
ifRetry: Retry[C, E, O] => OO) | |
: OO = | |
this match { | |
case Call(ct, ep, m, req, resType, res) => | |
ifCall(CallDesc(ct, ep, m, req, resType), res) | |
case Fork(forkImpl) => | |
ifFork(forkImpl) | |
case r@Retry(_, _) => | |
ifRetry(r) | |
} | |
def map[OO](f: O => OO): HttpCommand[C, E, OO] | |
} | |
private case class Call[C, E, O] | |
(callType: C, | |
endpoint: Endpoint, | |
method: Method, | |
request: Media, | |
responseType: MediaType, | |
response: Unmarshal[E, O]) | |
extends HttpCommand[C, E, O] { | |
def map[OO](f: O => OO) = copy(response = response andThen { _ map f }) | |
} | |
private case class Fork[C, E, O](impl: ForkDesc[C, E, O]) | |
extends HttpCommand[C, E, O] { | |
def map[OO](g: O => OO) = | |
HttpCommand.fork(impl.call1, impl.call2) { (a, b) => g(impl.f(a, b)) } | |
} | |
private case class Retry[C, E, O] | |
(plan: HttpPlan[C, E, O], desc: RetryDesc[E]) | |
extends HttpCommand[C, E, O] { | |
def map[OO](f: O => OO) = Retry(plan map f, desc) | |
} | |
private sealed abstract class RetryDesc[E] { | |
type S | |
def firstState: S | |
def test(s: S, es: NonEmptyList[E]): Option[(S, Option[Duration])] | |
} | |
private sealed abstract class ForkDesc[C, E, O] { self => | |
type A | |
type B | |
def call1: HttpPlan[C, E, A] | |
def call2: HttpPlan[C, E, B] | |
def f(a: A, b: B): O | |
} | |
private object HttpCommand { | |
def call[C, E, O] | |
(callType: C, | |
endpoint: Endpoint, | |
method: Method, | |
request: Media, | |
responseType: MediaType, | |
responseMarshal: Unmarshal[E, O]) | |
: HttpCommand[C, E, O] = | |
Call(callType, endpoint,method, request, responseType, responseMarshal) | |
def fork[C, E, A, B, O] | |
(call1: HttpPlan[C, E, A], | |
call2: HttpPlan[C, E, B]) | |
(f: (A, B) => O = Function.untupled(identity[(A, B)] _)) | |
: HttpCommand[C, E, O] = { | |
type AA = A | |
type BB = B | |
val c1 = call1 | |
val c2 = call2 | |
val ff = f | |
Fork(new ForkDesc[C, E, O] { | |
type A = AA | |
type B = BB | |
val call1 = c1 | |
val call2 = c2 | |
def f(a: A, b: B) = ff(a, b) | |
}) | |
} | |
def retry[S, C, E, O] | |
(plan: HttpPlan[C, E, O], | |
firstState: S) | |
(test: (S, NonEmptyList[E]) => Option[(S, Option[Duration])]) | |
: HttpCommand[C, E, O] = { | |
type SS = S | |
val firstState_ = firstState | |
val test_ = test | |
Retry(plan, | |
new RetryDesc[E] { | |
type S = SS | |
val firstState = firstState_ | |
def test(s: S, es: NonEmptyList[E]) = test_(s, es) | |
}) | |
} | |
implicit def functor[C, E] | |
: Functor[({type F[o] = HttpCommand[C, E, o]})#F] = { | |
type F[o] = HttpCommand[C, E, o] | |
new Functor[F] { def map[A, B](plan: F[A])(f: A => B) = plan map f } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package shajra.http.client | |
import scala.concurrent.ExecutionContextExecutorService | |
import scala.concurrent.duration.Duration | |
import scalaz.{EitherT, Free, Nondeterminism, Monad, ValidationNel, | |
NonEmptyList} | |
import scalaz.Free.{Return, Suspend} | |
import scalaz.concurrent.{Task, Strategy} | |
import scalaz.syntax.monad._ | |
import scalaz.syntax.std.option._ | |
import scalaz.std.option._ | |
import shajra.extn.scala.concurrent.Strategy.fromService | |
final class HttpPlan[C, E, O] private | |
(private val impl: Free[({type F[o] = HttpCommand[C, E, o]})#F, O]) { | |
def map[OO](f: O => OO): HttpPlan[C, E, OO] = | |
new HttpPlan(Monad[M].map(this.impl)(f)) | |
def flatMap[OO](f: O => HttpPlan[C, E, OO]): HttpPlan[C, E, OO] = | |
new HttpPlan(Monad[M].bind(this.impl)(f andThen { _.impl })) | |
def retry(times: Int): HttpPlan[C, E, O] = HttpPlan.retry(this, times) | |
def executeDefaultPool(strategy: CallStrategy[C, E]) | |
(implicit pool: ExecutionContextExecutorService = | |
fromService(Strategy.DefaultExecutorService)) | |
: EitherT[Task, NonEmptyList[E], O] = | |
execute(strategy)(pool) | |
def execute(strategy: CallStrategy[C, E]) | |
(pool: ExecutionContextExecutorService = | |
fromService(Strategy.DefaultExecutorService)) | |
: EitherT[Task, NonEmptyList[E], O] = { | |
def loop[OO](m: M[OO]): EitherT[Task, NonEmptyList[E], OO] = | |
m.resume.fold(_.visit(ifCall, ifFork, ifRetry), EitherT right Task.now(_)) | |
def ifCall[OO](call: CallDesc[C], res: Unmarshal[E, M[OO]]) | |
: EitherT[Task, NonEmptyList[E], OO] = | |
strategy(call, pool) >>= | |
(res andThen { v => EitherT(Task now v.disjunction) }) >>= | |
loop | |
def ifFork[OO](fork: ForkDesc[C, E, M[OO]]) | |
: EitherT[Task, NonEmptyList[E], OO] = | |
EitherT( | |
Nondeterminism[Task] | |
.mapBoth(execFork(fork.call1), execFork(fork.call2)) { (a, b) => | |
(a |@| b)(fork.f).disjunction | |
} | |
) >>= loop | |
def ifRetry[OO](retry: Retry[C, E, M[OO]]) | |
: EitherT[Task, NonEmptyList[E], OO] = { | |
val runFuture = loop(retry.plan.impl).swap.run.get | |
def loopRetry(state: (retry.desc.S, Option[Duration])) | |
: EitherT[Task, M[OO], NonEmptyList[E]] = { | |
val delayedFuture = state._2 some runFuture.after none runFuture | |
val run = EitherT(new Task(delayedFuture)) | |
run >>= { es => retry.desc test (state._1, es) some loopRetry none run } | |
} | |
loopRetry((retry.desc.firstState, none)).swap >>= loop | |
} | |
def execFork[OO](plan: HttpPlan[C, E, OO]): Task[ValidationNel[E, OO]] = | |
EitherT(Task.fork(loop(plan.impl).run)(pool)).validation | |
loop(impl) | |
} | |
private type F[o] = HttpCommand[C, E, o] | |
private type M[o] = Free[F, o] | |
} | |
object HttpPlan { | |
def bodylessCall[C, E, O] | |
(callType: C, | |
endpoint: Endpoint, | |
method: Method, | |
responseType: MediaType) | |
(responseMarshal: DetailedUnmarshal[C, E, O]) | |
: HttpPlan[C, E, O] = | |
call(callType, endpoint, method, responseType) | |
{ _:Unit => Media.empty } { responseMarshal } (()) | |
def call[C, E, I, O] | |
(callType: C, | |
endpoint: Endpoint, | |
method: Method, | |
responseType: MediaType) | |
(requestMarshal: I => Media) | |
(responseMarshal: DetailedUnmarshal[C, E, O]) | |
: I => HttpPlan[C, E, O] = { i => | |
val cd = | |
CallDesc(callType, endpoint, method, requestMarshal(i), responseType) | |
val m = responseMarshal.unmarshal(cd) | |
HttpPlan( | |
HttpCommand.call( | |
callType, endpoint,method, requestMarshal(i), responseType, m)) | |
} | |
def fork[C, E, A, B, O] | |
(plan1: HttpPlan[C, E, A], | |
plan2: HttpPlan[C, E, B]) | |
(f: (A, B) => O) | |
: HttpPlan[C, E, O] = | |
HttpPlan(HttpCommand.fork(plan1, plan2)(f)) | |
def retry[C, E, O](plan: HttpPlan[C, E, O], times: Int): HttpPlan[C, E, O] = | |
HttpPlan( | |
HttpCommand.retry(plan, times) { (state: Int, errors) => | |
(state - 1).some filter { _ > 0 } map { (_, none) } | |
}) | |
def point[C, E, A](a: A): HttpPlan[C, E, A] = { | |
type M[a] = HttpPlan[C, E, a] | |
Monad[M].point(a) | |
} | |
implicit def monad[C, E] | |
: Monad[({type M[o] = HttpPlan[C, E, o]})#M] = { | |
type F[o] = HttpCommand[C, E, o] | |
type M[o] = HttpPlan[C, E, o] | |
new Monad[M] { | |
def bind[A, B](m: M[A])(f: A => M[B]): M[B] = m flatMap f | |
def point[A](a: => A): M[A] = new HttpPlan(Return[F, A](a)) | |
} | |
} | |
private def apply[C, E, O](p: HttpCommand[C, E, O]): HttpPlan[C, E, O] = { | |
type F[o] = HttpCommand[C, E, o] | |
new HttpPlan(Suspend[F, O](p map { Return(_) })) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package shajra.extn.scala | |
package concurrent | |
import java.util.Collections | |
import java.util.concurrent.{ | |
AbstractExecutorService => AES, ExecutorService => ES, TimeUnit} | |
import scala.concurrent.{ | |
ExecutionContext => EC, ExecutionContextExecutorService => ECES} | |
object Strategy { | |
def fromService(es: ES): ECES = EC.fromExecutorService(es) | |
def fromContext(ec: EC): ECES = | |
new AES with ECES { | |
override def prepare(): EC = ec | |
override def isShutdown = false | |
override def isTerminated = false | |
override def shutdown() = () | |
override def shutdownNow() = Collections.emptyList[Runnable] | |
override def execute(runnable: Runnable): Unit = ec execute runnable | |
override def reportFailure(t: Throwable): Unit = ec reportFailure t | |
override def awaitTermination(length: Long, unit: TimeUnit): Boolean = false | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment