Created
July 12, 2016 22:38
-
-
Save maheshkelkar/75b87741008adf7d912cd9a36e40c862 to your computer and use it in GitHub Desktop.
FailureAccrualFactory.scala changes for https://github.com/twitter/finagle/issues/524
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala b/finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala | |
index 50956e2..5c2bcbb 100644 | |
--- a/finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala | |
+++ b/finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala | |
@@ -1,5 +1,7 @@ | |
package com.twitter.finagle.service | |
+import java.net.InetSocketAddress | |
+ | |
import com.twitter.conversions.time._ | |
import com.twitter.finagle.Stack.{Params, Role} | |
import com.twitter.finagle._ | |
@@ -10,6 +12,7 @@ import com.twitter.finagle.util.DefaultLogger | |
import com.twitter.logging.Level | |
import com.twitter.util._ | |
import java.util.logging.Logger | |
+ | |
import scala.util.Random | |
object FailureAccrualFactory { | |
@@ -310,7 +313,11 @@ class FailureAccrualFactory[Req, Rep] private[finagle]( | |
} | |
} | |
- private[this] val onServiceAcquisitionFailure: Throwable => Unit = { _ => didFail() } | |
+ private[this] val onServiceAcquisitionFailure: Throwable => Unit = { _ => | |
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory: onServiceAcquisitionFailure for "$label"""") | |
+ stopProbing() | |
+ didFail() | |
+ } | |
protected def isSuccess(reqRep: ReqRep): Boolean = | |
responseClassifier.applyOrElse(reqRep, ResponseClassifier.Default) match { | |
@@ -322,6 +329,7 @@ class FailureAccrualFactory[Req, Rep] private[finagle]( | |
// Only count revivals when the probe succeeds. | |
state match { | |
case ProbeClosed => | |
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory: ALIVE for "$label"""") | |
revivalCounter.incr() | |
failureAccrualPolicy.revived() | |
state = Alive | |
@@ -337,12 +345,13 @@ class FailureAccrualFactory[Req, Rep] private[finagle]( | |
if (state == Alive) removalCounter.incr() | |
state = Dead | |
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory: DEAD for "$label"""") | |
val timerTask = timer.schedule(duration.fromNow) { startProbing() } | |
reviveTimerTask = Some(timerTask) | |
- logger.log(Level.INFO, s"""FailureAccrualFactory marking connection to "$label" as dead. Remote Address: ${endpoint.toString}""") | |
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory marking connection to "$label" as dead. Remote Address: ${endpoint.toString()}""") | |
removedForCounter.incr(duration.inMilliseconds.toInt) | |
didMarkDead() | |
@@ -359,10 +368,33 @@ class FailureAccrualFactory[Req, Rep] private[finagle]( | |
* The service must satisfy one request before accepting more. | |
*/ | |
protected def startProbing() = svcFacSelf.synchronized { | |
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory: startProbing for "$label"""") | |
state = ProbeOpen | |
cancelReviveTimerTask() | |
} | |
+ /** | |
+ * Exit 'Probing' state (if necessary) | |
+ * | |
+ * The follow-on operation (i.e. the result of first request while probing) will determine | |
+ * whether the factory transitions to Alive (successful) or Dead (unsuccessful). | |
+ */ | |
+ private[this] def stopProbing() = { | |
+ state match { | |
+ case ProbeOpen => | |
+ logger.log(Level.WARNING, s"""***FailureAccrualFactory: stopProbing for "$label"""") | |
+ probesCounter.incr() | |
+ svcFacSelf.synchronized { | |
+ state match { | |
+ case ProbeOpen => | |
+ state = ProbeClosed | |
+ case _ => | |
+ } | |
+ } | |
+ case _ => | |
+ } | |
+ } | |
+ | |
def apply(conn: ClientConnection) = { | |
underlying(conn).map { service => | |
// N.B. the reason we can't simply filter the service factory is so that | |
@@ -375,18 +407,9 @@ class FailureAccrualFactory[Req, Rep] private[finagle]( | |
// ProbeClosed state. The result of first to complete will determine | |
// whether the factory transitions to Alive (successful) or Dead | |
// (unsuccessful). | |
- state match { | |
- case ProbeOpen => | |
- probesCounter.incr() | |
- svcFacSelf.synchronized { | |
- state match { | |
- case ProbeOpen => state = ProbeClosed | |
- case _ => | |
- } | |
- } | |
- case _ => | |
- } | |
+ stopProbing() | |
+ // Invoke service | |
service(request).respond { rep => | |
if (isSuccess(ReqRep(request, rep))) didSucceed() | |
else didFail() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment