Created
          October 31, 2013 22:18 
        
      - 
      
- 
        Save helena/7258149 to your computer and use it in GitHub Desktop. 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | import scala.language.postfixOps | |
| import scala.collection.immutable | |
| import scala.collection.immutable.Queue | |
| import scala.concurrent.forkjoin.ThreadLocalRandom | |
| import akka.actor.{ ActorLogging, ActorRef, Actor } | |
| import akka.routing.{ CurrentRoutees, RouterRoutees, Broadcast } | |
| import akka.util.Timeout | |
| import java.util.concurrent.atomic.AtomicInteger | |
| import com.crowdstrike.cloud.InternalLocationAction._ | |
| import com.crowdstrike.cloud.UserLocationAction.{ QueuedMessage, Routees } | |
| /** | |
| * INTERNAL API. | |
| * | |
| * Not an Akka router as the routees are not pre-configured but rather continuously fed in. | |
| * | |
| * @author Helena Edelson | |
| */ | |
| private[cloud] abstract class LoadBalancingLocationsRouter(val channel: ClusterChannel, queueLimit: Int) extends Actor | |
| with ActorLookup with ActorLogging { | |
| import LocationEvent._ | |
| protected var routees: Set[ActorRef] = channel.routees.map(_.routee) | |
| protected var queued: immutable.Queue[QueuedMessage] = Queue.empty | |
| override def preStart(): Unit = log.info("{} started", self.path) | |
| override def postStop(): Unit = routees = Set.empty | |
| def receive = { | |
| case CurrentRoutees => query(sender) | |
| case RouterRoutees(latest) => refresh(latest) | |
| case Publish(message) => publish(sender, message) | |
| case Broadcast(message) => broadcast(sender, message) | |
| case m: QueuedMessage => enqueue(m) | |
| } | |
| /** | |
| * The implementation strategy. | |
| */ | |
| def getNext: ActorRef | |
| /** | |
| * Pre-filters again for those not terminated and not [[akka.actor.ActorSystem.deadLetters]] | |
| * to handle any state inconsistencies between the time akka notifies locations of a downed | |
| * actor, the time locations removes that from the routees, and the time this gets a message. | |
| */ | |
| def publish(sender: ActorRef, message: Any): Unit = { | |
| routees = routees filter (isValid(_)) | |
| if (routees.nonEmpty) getNext forward message | |
| else unroutable(QueuedMessage(sender, message)) | |
| } | |
| /** | |
| * Pre-filters again for those not terminated and not [[akka.actor.ActorSystem.deadLetters]] | |
| * to handle any state inconsistencies between the time akka notifies locations of a downed | |
| * actor, the time locations removes that from the routees, and the time this gets a message. | |
| */ | |
| def broadcast(sender: ActorRef, message: Any): Unit = { | |
| routees = routees filter (isValid(_)) | |
| if (routees.nonEmpty) routees foreach (_ ! message) | |
| else unroutable(QueuedMessage(sender, message, true)) | |
| } | |
| /** | |
| * Notifies the sender and queues the message if there are no routees available and the message | |
| * is not already in the queue. | |
| */ | |
| def unroutable(m: QueuedMessage): Unit = { | |
| log.info("Received unroutable message from {}", m.sender) | |
| if (!queued.exists(_.id == m.id)) enqueue(m) | |
| else sender ! PublicationFailure(NoRouteesFound(channel), m.message) | |
| } | |
| /** | |
| * Queues messages for sending when routees come online. If the queue reaches the configured limit in | |
| * [[com.crowdstrike.cloud.CloudSettings.ClusterLocationsRouterQueueLimit]] | |
| * FIFO takes effect and the oldest message queued is dequeued to allow the most recent to be queued. | |
| */ | |
| def enqueue(message: QueuedMessage): Unit = { | |
| log.info("No routees found for {}, queueing message until routees become available.", channel.id) | |
| if (queued.size == queueLimit) { | |
| val (dequeued, updated) = queued.dequeue | |
| queued = updated | |
| log.info("Dequeued {}, updated queue size {}", dequeued.message, queued.size) | |
| } | |
| queued = queued.enqueue(message) | |
| log.debug("Enqueued {}, queue size {}", message, queued.size) | |
| } | |
| /** | |
| * Adds new and removes stale routees and attempts to drain the `queued` messages, by the implemented | |
| * load-balancing strategy, if routees are not empty. | |
| */ | |
| def refresh(latest: Iterable[ActorRef]): Unit = { | |
| val stale = routees &~ latest.toSet | |
| routees --= stale | |
| val unseen = latest collect {case r if isValid(r) && !routees.contains(r) => r} | |
| routees ++= unseen | |
| if (routees.nonEmpty) { | |
| queued foreach { m => | |
| if (m.isBroadcast) broadcast(m.sender, m.message) else publish(m.sender, m.message) | |
| } | |
| queued = Queue.empty | |
| } | |
| } | |
| def query(sender: ActorRef): Unit = sender ! RouterRoutees(routees) | |
| } | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment