Last active
January 5, 2023 08:12
-
-
Save patriknw/5946678 to your computer and use it in GitHub Desktop.
Logs the mailbox size when exceeding the configured limit. Implemented in Scala and Java. Copy one of them to your project and define the configuration. This code is licensed under the Apache 2 license.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> | |
*/ | |
package akka.contrib.mailbox | |
import scala.concurrent.duration._ | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.atomic.AtomicLong | |
import com.typesafe.config.Config | |
import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem } | |
import akka.dispatch.{ Envelope, MailboxType, MessageQueue, UnboundedMailbox, UnboundedQueueBasedMessageQueue, ProducesMessageQueue } | |
import akka.event.Logging | |
/** | |
* Logs the mailbox size when exceeding the configured limit. It logs at most once per second | |
* when the messages are enqueued or dequeued. | |
* | |
* Configuration: | |
* <pre> | |
* akka.actor.default-mailbox { | |
* mailbox-type = akka.contrib.mailbox.LoggingMailboxType | |
* size-limit = 20 | |
* } | |
* </pre> | |
*/ | |
class LoggingMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] { | |
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match { | |
case (Some(o), Some(s)) => | |
val sizeLimit = config.getInt("size-limit") | |
val mailbox = new LoggingMailbox(o, s, sizeLimit) | |
mailbox | |
case _ => throw new IllegalArgumentException("no mailbox owner or system given") | |
} | |
} | |
class LoggingMailbox(owner: ActorRef, system: ActorSystem, sizeLimit: Int) | |
extends UnboundedMailbox.MessageQueue { | |
private val interval = 1000000000L // 1 s, in nanoseconds | |
private lazy val log = Logging(system, classOf[LoggingMailbox]) | |
private val path = owner.path.toString | |
@volatile private var logTime: Long = System.nanoTime() | |
private val queueSize = new AtomicInteger | |
private val dequeueCount = new AtomicInteger | |
override def dequeue(): Envelope = { | |
val x = super.dequeue() | |
if (x ne null) { | |
val size = queueSize.decrementAndGet() | |
dequeueCount.incrementAndGet() | |
logSize(size) | |
} | |
x | |
} | |
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = { | |
super.enqueue(receiver, handle) | |
val size = queueSize.incrementAndGet() | |
logSize(size) | |
} | |
def logSize(size: Int): Unit = | |
if (size >= sizeLimit) { | |
val now = System.nanoTime() | |
if (now - logTime > interval) { | |
val msgPerSecond = dequeueCount.get.toDouble / ((now - logTime).toDouble / 1000000000L) | |
logTime = now | |
dequeueCount.set(0) | |
log.info("Mailbox size for [{}] is [{}], processing [{}] msg/s", path, size, f"$msgPerSecond%2.2f") | |
} | |
} | |
override def numberOfMessages: Int = queueSize.get | |
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { | |
super.cleanUp(owner, deadLetters) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2014 Typesafe <http://typesafe.com/> | |
*/ | |
package akka.contrib.mailbox; | |
import com.typesafe.config.Config; | |
import java.util.Queue; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import scala.Option; | |
import akka.actor.ActorRef; | |
import akka.actor.ActorSystem; | |
import akka.dispatch.Envelope; | |
import akka.dispatch.MailboxType; | |
import akka.dispatch.MessageQueue; | |
import akka.dispatch.ProducesMessageQueue; | |
import akka.dispatch.UnboundedMailbox; | |
import akka.event.Logging; | |
import akka.event.LoggingAdapter; | |
/** | |
* Logs the mailbox size when exceeding the configured limit. It logs at most | |
* once per second when the messages are enqueued or dequeued. | |
* | |
* Configuration: | |
* | |
* <pre> | |
* akka.actor.default-mailbox { | |
* mailbox-type = akka.contrib.mailbox.LoggingMailboxType | |
* size-limit = 20 | |
* } | |
* </pre> | |
*/ | |
public class LoggingMailboxType implements MailboxType, ProducesMessageQueue<UnboundedMailbox.MessageQueue> { | |
private final Config config; | |
public LoggingMailboxType(ActorSystem.Settings settings, Config config) { | |
this.config = config; | |
} | |
@Override | |
public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) { | |
if (owner.isEmpty() || system.isEmpty()) | |
throw new IllegalArgumentException("no mailbox owner or system given"); | |
int sizeLimit = config.getInt("size-limit"); | |
return new LoggingMailbox(owner.get(), system.get(), sizeLimit); | |
} | |
static class LoggingMailbox implements MessageQueue { | |
private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>(); | |
private final int sizeLimit; | |
private final LoggingAdapter log; | |
private final long interval = 1000000000L; // 1 s, in nanoseconds | |
private final String path; | |
volatile private long logTime = System.nanoTime(); | |
private final AtomicInteger queueSize = new AtomicInteger(); | |
private final AtomicInteger dequeueCount = new AtomicInteger(); | |
LoggingMailbox(ActorRef owner, ActorSystem system, int sizeLimit) { | |
this.path = owner.path().toString(); | |
this.sizeLimit = sizeLimit; | |
this.log = Logging.getLogger(system, LoggingMailbox.class); | |
} | |
@Override | |
public Envelope dequeue() { | |
Envelope x = queue.poll(); | |
if (x != null) { | |
int size = queueSize.decrementAndGet(); | |
dequeueCount.incrementAndGet(); | |
logSize(size); | |
} | |
return x; | |
} | |
@Override | |
public void enqueue(ActorRef receiver, Envelope handle) { | |
queue.offer(handle); | |
int size = queueSize.incrementAndGet(); | |
logSize(size); | |
} | |
private void logSize(int size) { | |
if (size >= sizeLimit) { | |
long now = System.nanoTime(); | |
if (now - logTime > interval) { | |
double msgPerSecond = ((double) dequeueCount.get()) / (((double) (now - logTime)) / 1000000000L); | |
logTime = now; | |
dequeueCount.set(0); | |
log.info("Mailbox size for [{}] is [{}], processing [{}] msg/s", path, size, | |
String.format("%2.2f", msgPerSecond)); | |
} | |
} | |
} | |
@Override | |
public int numberOfMessages() { | |
return queueSize.get(); | |
} | |
@Override | |
public boolean hasMessages() { | |
return !queue.isEmpty(); | |
} | |
@Override | |
public void cleanUp(ActorRef owner, MessageQueue deadLetters) { | |
for (Envelope handle : queue) { | |
deadLetters.enqueue(owner, handle); | |
} | |
} | |
} | |
} |
Hi, I wondered why cleanUp
is overridden in the Scala version of the code? It looks like all it does is call super.cleanUp
, so wouldn't that be the same effect as just not overriding the function?
@TJC I think you are right, no good reason for that override
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks @patriknw !
I've resolved the issue by adding
akka.dispatch.UnboundedMessageQueueSemantics
tostatic class LoggingMailbox implements MessageQueue, akka.dispatch.UnboundedMessageQueueSemantics