Skip to content

Instantly share code, notes, and snippets.

@patriknw
Created June 15, 2013 03:39
Show Gist options
  • Save patriknw/5786787 to your computer and use it in GitHub Desktop.
Save patriknw/5786787 to your computer and use it in GitHub Desktop.
Sample of mailbox extension that counts messages in queue
package akka.contrib.mailbox
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.config.Config
import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.dispatch.{ Envelope, MailboxType, MessageQueue, UnboundedMailbox, UnboundedQueueBasedMessageQueue }
object MetricsMailboxExtension extends ExtensionId[MetricsMailboxExtension] with ExtensionIdProvider {
def lookup = this
def createExtension(s: ExtendedActorSystem) = new MetricsMailboxExtension(s)
}
class MetricsMailboxExtension(val system: ExtendedActorSystem) extends Extension {
private val mailboxes = new ConcurrentHashMap[ActorRef, MetricsMailbox]
def register(actorRef: ActorRef, mailbox: MetricsMailbox): Unit =
mailboxes.put(actorRef, mailbox)
def unregister(actorRef: ActorRef): Unit = mailboxes.remove(actorRef)
def mailboxSize(ref: ActorRef): Int =
mailboxes.get(ref) match {
case null ⇒ 0
case mailbox ⇒ mailbox.numberOfMessages
}
}
class MetricsMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match {
case (Some(o), Some(s)) ⇒
val mailbox = new MetricsMailbox(o, s)
MetricsMailboxExtension(s).register(o, mailbox)
mailbox
case _ ⇒ throw new Exception("no mailbox owner or system given")
}
}
class MetricsMailbox(owner: ActorRef, system: ActorSystem) extends UnboundedMailbox.MessageQueue {
private val queueSize = new AtomicInteger
override def dequeue(): Envelope = {
val x = super.dequeue()
if (x ne null) queueSize.decrementAndGet
x
}
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
super.enqueue(receiver, handle)
queueSize.incrementAndGet
}
override def numberOfMessages: Int = queueSize.get
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
super.cleanUp(owner, deadLetters)
MetricsMailboxExtension(system).unregister(owner)
}
}
package akka.contrib.mailbox
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.{ Actor, Props }
import akka.testkit.{ AkkaSpec, ImplicitSender, TestLatch }
class MetricsMailboxSpec extends AkkaSpec("""
metrics-dispatcher {
mailbox-type = "akka.contrib.mailbox.MetricsMailboxType"
}
""") with ImplicitSender {
"A MetricsMailbox" must {
"count messages in queue" in {
val a = system.actorOf(Props[MyActor2].withDispatcher("metrics-dispatcher"))
MetricsMailboxExtension(system).mailboxSize(a) must be(0)
val latch = TestLatch(1)
a ! latch
expectMsg("gotIt")
MetricsMailboxExtension(system).mailboxSize(a) must be(0)
a ! "hello"
MetricsMailboxExtension(system).mailboxSize(a) must be(1)
latch.countDown()
expectMsg("ok")
expectMsg("hello")
}
}
}
class MyActor2 extends Actor {
def receive = {
case latch: TestLatch ⇒
sender ! "gotIt"
Await.ready(latch, 5.seconds)
sender ! "ok"
case msg ⇒
sender ! msg
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment