Forked from gszeliga/custom_circuit_breaker.scala
Last active
August 29, 2015 14:06
-
-
Save ignasi35/3b9ea257545fb556b85d to your computer and use it in GitHub Desktop.
While reviewing the code at http://covariantblabbering.blogspot.com.es/2014/09/use-circuit-breakers-goddammit.html?view=classic I found some things I wanted to rewrite and ended up with this code.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.covariantblabbering | |
import java.util.concurrent.atomic.AtomicReference | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.atomic.AtomicLong | |
import scala.concurrent.duration.Duration | |
import scala.concurrent.duration._ | |
import scala.util.{Failure, Success, Try} | |
class Configuration(val name: String, val timeout: Duration = 2000.millis, val failureThreshold: Int = 3) | |
class CircuitBreakerStillOpenException(msg: String) extends Exception(msg) | |
trait State { | |
def preInvoke(cb: CircuitBreaker) = {} | |
// renamed postInvoke to onSuccess | |
def onSuccess(cb: CircuitBreaker) = {} | |
def onError(cb: CircuitBreaker, source: Exception) = {} | |
def name:String | |
override def toString = name | |
} | |
object CircuitBreaker { | |
def apply(conf: Configuration) = new CircuitBreaker(conf) | |
} | |
class CircuitBreaker(private val conf: Configuration) { | |
// Right should not be an exception. Right should be T. | |
// Either[Exception,T] is usually replaceable with Try[T] | |
//def invoke[T](body: => T): Either[T, Exception] = { | |
def invoke[T](body: => T): Try[T] = { | |
try { | |
state.get.preInvoke(this) | |
val result = body | |
state.get.onSuccess(this) | |
Success(result) | |
} catch { | |
case e: CircuitBreakerStillOpenException | IllegalStateException=> Failure(e) | |
case e: Exception => { | |
state.get.onError(this, e) | |
Failure(e) | |
} | |
} | |
} | |
private object HalfOpen extends State { | |
def name = "Half-Open" | |
override def onSuccess(cb: CircuitBreaker) = cb.reset() | |
override def onError(cb: CircuitBreaker, source: Exception) = { | |
cb.incrementAndGetFailureCount | |
cb.breakTrip(this) | |
} | |
} | |
private object Open extends State { | |
def name = "Open" | |
override def preInvoke(cb: CircuitBreaker) = { | |
if (cb.timedOut) { | |
cb.attemptReset() | |
} else { | |
// TODO hmm I don't like this exception here. | |
throw new CircuitBreakerStillOpenException(s"Circuit breaker is still open. Elapsed time: ${cb.elapsed}"); | |
} | |
} | |
} | |
private object Close extends State { | |
def name = "Close" | |
override def onSuccess(cb: CircuitBreaker) = cb.resetFailureCount() | |
override def onError(cb: CircuitBreaker, source: Exception) = { | |
// TODO: would like to change this too, put it into cb.reportError; if(cb.tooManyErrors) | |
val failures = cb.incrementAndGetFailureCount | |
val threshold = cb.failureThreshold | |
if (failures >= threshold) cb.breakTrip(this) | |
} | |
} | |
private val state = new AtomicReference[State] | |
private[custom] val failureCount = new AtomicInteger(0) | |
protected val _tripTime = new AtomicLong(0) | |
//Initialization | |
transition(null, Close) | |
private def transition(from: State, to: State) = { | |
def swap(from: State, to: State) = state.compareAndSet(from, to) | |
println(s"[${conf.name}] Transition [$from => $to] requested [fc: ${failureCount.get}, tt: ${_tripTime.get}]") | |
if (!swap(from, to)) throw new IllegalStateException(s"Illegal transition attempted from ${from} to ${to}") | |
} | |
private def reset() = { | |
println(s"[${conf.name}] Reset") | |
transition(HalfOpen, Close) | |
resetFailureCount() | |
} | |
private def elapsed = System.currentTimeMillis.millis - _tripTime.get.millis | |
private def timedOut = elapsed > conf.timeout | |
private def attemptReset() = { | |
println(s"[${conf.name}] Attempting reset"); | |
transition(Open, HalfOpen) | |
} | |
private def failureThreshold = conf.failureThreshold | |
private def is = (s: State) => state.get == s | |
// These three are never used. | |
private def isOpen = is(Open) | |
private def isClose = is(Close) | |
private def isHalfOpen = is(HalfOpen) | |
private[custom] def incrementAndGetFailureCount = failureCount.incrementAndGet | |
// renamed from tripFromState to breakTrip | |
private[custom] def breakTrip(from: State) = { | |
println(s"[${conf.name}] Trip from state [$from]"); | |
transition(from, Open) | |
_tripTime.set(System.currentTimeMillis) | |
} | |
private[custom] def resetFailureCount() = failureCount.set(0) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment