Created
September 19, 2018 07:55
-
-
Save pchlupacek/3b0a9e7e8f4092583b872bb8dd4b18fa to your computer and use it in GitHub Desktop.
An attempt for CB implemented with Ref.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2017-2018 The Typelevel Cats-effect Project Developers | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package cats.effect.concurrent | |
import java.util.concurrent.TimeUnit | |
import cats.syntax.all._ | |
import cats.effect.{Clock, Sync} | |
import scala.concurrent.duration._ | |
trait CircuitBreaker[F[_]] { | |
def protect[A](fa: F[A]):F[A] | |
def state:F[CircuitBreaker.S] | |
} | |
object CircuitBreaker { | |
type TimeStamp = Long | |
sealed trait S | |
sealed trait Reason | |
case class Closed(failures: Int) extends S | |
case class Open(startedAt: TimeStamp, resetTimeout: FiniteDuration) extends S with Reason | |
case object HalfOpen extends S with Reason | |
private val ClosedZero = Closed(0) | |
case class RejectedExecution(reason: Reason) extends Throwable(s"Execution Rejected: $reason") | |
def of[F[_]]( | |
maxFailures: Int | |
, resetTimeout: FiniteDuration | |
, exponentialBackoffFactor: Double = 1 | |
, maxResetTimeout: Duration = Duration.Inf | |
)(implicit F: Sync[F], C: Clock[F]): F[CircuitBreaker[F]] = { | |
of(maxFailures, resetTimeout, exponentialBackoffFactor, maxResetTimeout, F.unit, F. unit, F.unit, F.unit) | |
} | |
def of[F[_]]( | |
maxFailures: Int | |
, resetTimeout: FiniteDuration | |
, exponentialBackoffFactor: Double | |
, maxResetTimeout: Duration | |
, onRejected: F[Unit] | |
, onClosed: F[Unit] | |
, onHalfOpen: F[Unit] | |
, onOpen: F[Unit] | |
)(implicit F: Sync[F], C: Clock[F]): F[CircuitBreaker[F]] = { | |
Ref.of[F, S](ClosedZero).map { ref => | |
new CircuitBreaker[F] { | |
def openOnFail[A](f: F[A]): F[A] = { | |
f.attempt.flatMap { | |
case Right(a) => | |
ref.set(ClosedZero) as a | |
case Left(err) => | |
C.monotonic(TimeUnit.MILLISECONDS).flatMap { now => | |
ref.modifyF { | |
case Closed(failures) => | |
val count = failures + 1 | |
if (count >= maxFailures) (Open(now, resetTimeout), onOpen >> F.raiseError(err)) | |
else (Closed(count), F.raiseError(err)) | |
case open: Open => (open, F.raiseError(err)) | |
case HalfOpen => (HalfOpen, F.raiseError(err)) | |
} | |
} | |
} | |
} | |
def backoff(open:Open): Open = { | |
def next = (open.resetTimeout.toMillis * exponentialBackoffFactor).millis | |
open.copy( | |
resetTimeout = maxResetTimeout match { | |
case fin: FiniteDuration => next min fin | |
case _: Duration => next | |
} | |
) | |
} | |
def tryReset[A](open:Open,fa: F[A]): F[A] = { | |
C.monotonic(TimeUnit.MILLISECONDS).flatMap { now => | |
if (open.startedAt + open.resetTimeout.toMillis >= now) onRejected >> F.raiseError(RejectedExecution(open)) | |
else { | |
def resetOnSuccess: F[A] = { | |
fa.attempt.flatMap { | |
case Left(err) => ref.set(backoff(open)) >> F.raiseError(err) | |
case Right(a) => onClosed >> ref.set(ClosedZero) as a | |
} | |
} | |
ref.modifyF { | |
case closed: Closed => (closed, openOnFail(fa)) | |
case open@Open(startedAt, resetTimeout) => | |
if (startedAt == open.startedAt && open.resetTimeout == resetTimeout) (HalfOpen, onHalfOpen >> resetOnSuccess) | |
else (open, onRejected >> F.raiseError(RejectedExecution(open))) | |
case HalfOpen => (HalfOpen, onRejected >> F.raiseError[A](RejectedExecution(HalfOpen))) | |
} | |
} | |
} | |
} | |
def protect[A](fa: F[A]): F[A] = { | |
ref.modifyF { | |
case closed: Closed => (closed, openOnFail(fa)) | |
case open: Open => (open, tryReset(open, fa)) | |
case HalfOpen => (HalfOpen, onRejected >> F.raiseError[A](RejectedExecution(HalfOpen))) | |
} | |
} | |
def state: F[S] = ref.get | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment