Skip to content

Instantly share code, notes, and snippets.

@atamborrino
Last active April 14, 2022 09:10
Show Gist options
  • Save atamborrino/ff67876df9b862758beade693a77ea97 to your computer and use it in GitHub Desktop.
Save atamborrino/ff67876df9b862758beade693a77ea97 to your computer and use it in GitHub Desktop.
Monitor Scala's ExecutionContext / Akka Dispatcher lag (number of tasks in waiting queues)
import java.util.concurrent._
import akka.dispatch.{Dispatcher, ExecutorServiceDelegate}
import config.Config
import helpers.ScalaLogger
class ExecutionContextMonitor()(implicit metricsService: MetricsClient, config: Config) {
private val log = ScalaLogger.get(this.getClass)
private val scheduler = Executors.newSingleThreadScheduledExecutor()
// Reflection to access Scala protected method
private val ecGetterForAkkaDispatcher = classOf[Dispatcher].getDeclaredMethod("executorService")
ecGetterForAkkaDispatcher.setAccessible(true)
def monitor(ec: Executor, ecName: String): Unit = ec match {
case threadPoolExecutor: ThreadPoolExecutor =>
log.info(s"Monitoring ec $ecName of type ThreadPoolExecutor")
val runnable = new Runnable {
override def run(): Unit = setMetricsForThreadPoolExecutor(ecName, threadPoolExecutor)
}
scheduler.scheduleAtFixedRate(runnable, interval.toMillis, interval.toMillis, TimeUnit.MILLISECONDS)
case forkJoinPool: ForkJoinPool =>
log.info(s"Monitoring ec $ecName of type ForkJoinPool")
val runnable = new Runnable {
override def run(): Unit = setMetricsForForkJoinPool(ecName, forkJoinPool)
}
scheduler.scheduleAtFixedRate(runnable, interval.toMillis, interval.toMillis, TimeUnit.MILLISECONDS)
case dispatcher: Dispatcher =>
log.info(s"Monitoring ec $ecName of type akka.dispatch.Dispatcher")
val runnable = new Runnable {
override def run(): Unit = {
val ec = ecGetterForAkkaDispatcher.invoke(dispatcher).asInstanceOf[ExecutorServiceDelegate].executor
ec match {
case threadPoolExecutor: ThreadPoolExecutor => setMetricsForThreadPoolExecutor(ecName, threadPoolExecutor)
case forkJoinPool: ForkJoinPool => setMetricsForForkJoinPool(ecName, forkJoinPool)
case _ => log.warn(s"Can not set metrics for ect of type ${ec.getClass} from akka dispatcher $dispatcher")
}
}
}
scheduler.scheduleAtFixedRate(runnable, interval.toMillis, interval.toMillis, TimeUnit.MILLISECONDS)
case _ => log.warn(s"Can not register metrics monitoring for execution context $ec")
}
private def setMetricsForThreadPoolExecutor(ecName: String, threadPoolExecutor: ThreadPoolExecutor): Unit = {
val queueSize = threadPoolExecutor.getQueue.size
log.trace(s"Queue size for ec $ecName of type ThreadPoolExecutor: $queueSize")
metricsService.setECWaitingTasksNumber(ecName, ExecutionContextType.THREAD_POOL_EXECUTOR, queueSize)
}
private def setMetricsForForkJoinPool(ecName: String, forkJoinPool: ForkJoinPool): Unit = {
val queueSize = forkJoinPool.getQueuedSubmissionCount
log.trace(s"Queue size for ec $ecName of type ForkJoinPool: $queueSize")
metricsService.setECWaitingTasksNumber(ecName, ExecutionContextType.FORK_JOIN_POOL, queueSize)
}
private def interval = config.ecMonitorInterval
def close(): Unit = {
scheduler.shutdown()
}
}
@atamborrino
Copy link
Author

atamborrino commented Jun 1, 2017

Usage:

val system = ActorSystem()
executionContextMonitor.monitor(system.dispatcher, "akka.default-dispatcher")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment