Created
June 15, 2013 03:39
-
-
Save patriknw/5786787 to your computer and use it in GitHub Desktop.
Sample of mailbox extension that counts messages in queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.contrib.mailbox | |
import java.util.concurrent.ConcurrentHashMap | |
import java.util.concurrent.atomic.AtomicInteger | |
import com.typesafe.config.Config | |
import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } | |
import akka.dispatch.{ Envelope, MailboxType, MessageQueue, UnboundedMailbox, UnboundedQueueBasedMessageQueue } | |
object MetricsMailboxExtension extends ExtensionId[MetricsMailboxExtension] with ExtensionIdProvider { | |
def lookup = this | |
def createExtension(s: ExtendedActorSystem) = new MetricsMailboxExtension(s) | |
} | |
class MetricsMailboxExtension(val system: ExtendedActorSystem) extends Extension { | |
private val mailboxes = new ConcurrentHashMap[ActorRef, MetricsMailbox] | |
def register(actorRef: ActorRef, mailbox: MetricsMailbox): Unit = | |
mailboxes.put(actorRef, mailbox) | |
def unregister(actorRef: ActorRef): Unit = mailboxes.remove(actorRef) | |
def mailboxSize(ref: ActorRef): Int = | |
mailboxes.get(ref) match { | |
case null ⇒ 0 | |
case mailbox ⇒ mailbox.numberOfMessages | |
} | |
} | |
class MetricsMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType { | |
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match { | |
case (Some(o), Some(s)) ⇒ | |
val mailbox = new MetricsMailbox(o, s) | |
MetricsMailboxExtension(s).register(o, mailbox) | |
mailbox | |
case _ ⇒ throw new Exception("no mailbox owner or system given") | |
} | |
} | |
class MetricsMailbox(owner: ActorRef, system: ActorSystem) extends UnboundedMailbox.MessageQueue { | |
private val queueSize = new AtomicInteger | |
override def dequeue(): Envelope = { | |
val x = super.dequeue() | |
if (x ne null) queueSize.decrementAndGet | |
x | |
} | |
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = { | |
super.enqueue(receiver, handle) | |
queueSize.incrementAndGet | |
} | |
override def numberOfMessages: Int = queueSize.get | |
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { | |
super.cleanUp(owner, deadLetters) | |
MetricsMailboxExtension(system).unregister(owner) | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.contrib.mailbox | |
import com.typesafe.config.ConfigFactory | |
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
import akka.actor.{ Actor, Props } | |
import akka.testkit.{ AkkaSpec, ImplicitSender, TestLatch } | |
class MetricsMailboxSpec extends AkkaSpec(""" | |
metrics-dispatcher { | |
mailbox-type = "akka.contrib.mailbox.MetricsMailboxType" | |
} | |
""") with ImplicitSender { | |
"A MetricsMailbox" must { | |
"count messages in queue" in { | |
val a = system.actorOf(Props[MyActor2].withDispatcher("metrics-dispatcher")) | |
MetricsMailboxExtension(system).mailboxSize(a) must be(0) | |
val latch = TestLatch(1) | |
a ! latch | |
expectMsg("gotIt") | |
MetricsMailboxExtension(system).mailboxSize(a) must be(0) | |
a ! "hello" | |
MetricsMailboxExtension(system).mailboxSize(a) must be(1) | |
latch.countDown() | |
expectMsg("ok") | |
expectMsg("hello") | |
} | |
} | |
} | |
class MyActor2 extends Actor { | |
def receive = { | |
case latch: TestLatch ⇒ | |
sender ! "gotIt" | |
Await.ready(latch, 5.seconds) | |
sender ! "ok" | |
case msg ⇒ | |
sender ! msg | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment