Created
January 24, 2012 15:04
-
-
Save daggerrz/1670599 to your computer and use it in GitHub Desktop.
Akka 1.3 circuit breaking dispatcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.tapad.util | |
import akka.dispatch._ | |
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean} | |
class LogAndDiscardCircuitBreakerPolicy(val maxMailboxSize: Int, loggable: Logging) extends CircuitBreakerPolicy { | |
def onOverflowStart(mailboxSize: Int) { loggable.log.warn("Mailbox size is above {}. Ignoring messages until size is back below.", maxMailboxSize) } | |
def onBackToNormal(overflowCount: Int) { loggable.log.info("Mailbox size is back below {}. A total of {} messags were discarded.", maxMailboxSize, overflowCount) } | |
def replyToOverflow(overflowCount: Int, msg: Any) : Either[Exception, Any] = Left(new IllegalArgumentException("This overflow policy is not configured for request-reply actors.")) | |
def shouldBeDiscarded(overflowCount: Int, msg: Any) : Boolean = true | |
} | |
/** | |
* Circuit breaker policy configuration trait. | |
*/ | |
trait CircuitBreakerPolicy { | |
/** | |
* Max mailbox size before overflowing. | |
*/ | |
def maxMailboxSize: Int | |
/** | |
* Called when an overflow situation starts. | |
* | |
* @param mailboxSize the current mailbox size | |
*/ | |
def onOverflowStart(mailboxSize: Int) | |
/** | |
* Called for every message received while overflowing on a future | |
* channel. Allows for responding with a default message in overflow situations. | |
* | |
* @param overflowCount 1 for the first message and keeps increasing by 1 until the | |
* situation is resolved | |
* @param msg the current message | |
* | |
* @return the value to return to the sender / set on the future | |
*/ | |
def replyToOverflow(overflowCount: Int, msg: Any) : Either[Exception, Any] | |
/** | |
* Called for every message received while overflowing on a null channel (no response). | |
* | |
* @return true if the message should be discarded | |
*/ | |
def shouldBeDiscarded(overflowCount: Int, msg: Any) : Boolean | |
/** | |
* Called when the actor is not overflowing anymore. | |
* | |
* @param overflowCount the total number of messages overflown | |
*/ | |
def onBackToNormal(overflowCount: Int) | |
} | |
/** | |
* An Akka message queue with unbounded semantics and circuit breaking. When the queue | |
* reaches the mailbox size, it will start popping the oldest elements | |
* off of the queue until it's back to size. | |
*/ | |
trait CircuitBreakingDispatcherSemantics extends MessageDispatcher { self: ExecutorBasedEventDrivenDispatcher => | |
/** | |
* The overflow policy to use. | |
*/ | |
protected def policy : CircuitBreakerPolicy | |
private[this] val overflowing = new AtomicBoolean(false) | |
private[this] val overflowCount = new AtomicInteger(0) | |
abstract override def dispatch(invocation: MessageInvocation) { | |
val size = mailboxSize(invocation.receiver) + 1 | |
if (size >= policy.maxMailboxSize) { | |
if (!overflowing.getAndSet(true)) { | |
policy.onOverflowStart(size) | |
} | |
invocation.channel match { | |
case f: ActorCompletableFuture => | |
f.complete(policy.replyToOverflow(overflowCount.addAndGet(1), invocation.message)) | |
case NullChannel => | |
if (!policy.shouldBeDiscarded(overflowCount.addAndGet(1), invocation.message)) { | |
super.dispatch(invocation) | |
} | |
} | |
} else { | |
if (overflowing.getAndSet(false)) { | |
policy.onBackToNormal(overflowCount.getAndSet(0)) | |
} | |
super.dispatch(invocation) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment