Last active
August 17, 2019 20:06
-
-
Save karthik20522/0725b842fcccce32e01b84c45c5270b7 to your computer and use it in GitHub Desktop.
AkkaSampling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.actor | |
import java.io.BufferedOutputStream | |
import java.io.FileOutputStream | |
import akka.dispatch._ | |
import java.util.concurrent.ExecutorService | |
import scala.concurrent.forkjoin.ForkJoinPool | |
import java.io.{OutputStream, Writer, OutputStreamWriter} | |
import TraversalHelper._ | |
import scala.collection.mutable.SortedSet | |
import scala.math.Ordering | |
object AkkaSampling { | |
case class Settings(samplingPeriod: Int = 5, | |
maxChildren: Int = Int.MaxValue, | |
topSettings: TopSettings = TopSettings()) | |
case class TopSettings(nrOfMsgThreshold: Int = -1, maxActorsToShow: Int = Int.MaxValue) | |
def print(as: ActorSystem, out: OutputStream = System.out)(implicit settings: Settings = Settings()) = { | |
val writer = new OutputStreamWriter(out) | |
doAsync(settings.samplingPeriod) { | |
doActors(as, writer) | |
doDispatchers(as, writer) | |
writer.flush | |
} | |
} | |
def doActors(as: ActorSystem, writer: Writer)(implicit settings: Settings) = { | |
implicit val topActorsOrdering = Ordering.fromLessThan[ActorRefWithCell]( | |
(a1, a2) => a1.underlying.numberOfMessages >= a2.underlying.numberOfMessages) | |
val topActors = SortedSet() | |
def printTop = { | |
writer.write("\n=Top actors:=\n") | |
topActors.take(settings.topSettings.maxActorsToShow).foreach { actor => | |
writer.write(actor.path + ": " + actor.underlying.numberOfMessages + "\n") | |
} | |
} | |
val ah: ActorHandler = (actor, depth) => { | |
val nrOfMessages = actor.underlying.numberOfMessages | |
writer.write(" " * depth + actor.path + ": nrOfMessages = " + nrOfMessages + "\n") | |
if (nrOfMessages >= settings.topSettings.nrOfMsgThreshold) | |
topActors += actor | |
} | |
writer.write("\n==Actors:==\n") | |
traverseActorTree(as)(ah) | |
printTop | |
} | |
def doDispatchers(as: ActorSystem, writer: Writer)(implicit settings: Settings) = { | |
val dh: DispatcherHandler = { | |
case (dispatcher, Some(pool: ForkJoinPool)) => | |
writer.write(dispatcher.id + ": activeCount = " + pool.getActiveThreadCount + | |
", poolSize = " + pool.getPoolSize + "\n") | |
case (dispatcher, Some(execSrv)) => writer.write(dispatcher + " " + execSrv + "\n") | |
case (dispatcher, None) => writer.write(dispatcher.toString + "\n") | |
} | |
writer.write("\n==Dispatchers:==\n") | |
traverseDispatchers(as)(dh) | |
} | |
def doAsync(period: Int)(code: => Unit) = { | |
new Thread { | |
override def run = { | |
while (true) { | |
code | |
Thread.sleep(period * 1000) | |
} | |
} | |
}.start | |
} | |
} | |
package akka.actor | |
import akka.dispatch._ | |
import java.util.concurrent.ExecutorService | |
import scala.concurrent.forkjoin.ForkJoinPool | |
object TraversalHelper { | |
type Depth = Int | |
type ActorHandler = (ActorRefWithCell, Depth) => Unit | |
type DispatcherHandler = (MessageDispatcher, Option[ExecutorService]) => Unit | |
def traverseActorTree(as: ActorSystem)(handler: ActorHandler) = { | |
def traverseSubTree(node: ActorRefWithCell, depth: Depth): Unit = { | |
handler(node, depth) | |
node.underlying.childrenRefs.children foreach { actorRef => | |
traverseSubTree(actorRef.asInstanceOf[ActorRefWithCell], depth + 1) | |
} | |
} | |
val system = as.asInstanceOf[ActorSystemImpl] | |
traverseSubTree(system.guardian, 0) | |
} | |
object Reflection { // helpers to access private fields via Java Reflection | |
val dispConfiguratorsField = { | |
val f = classOf[Dispatchers].getDeclaredField("dispatcherConfigurators") | |
f.setAccessible(true) | |
f | |
} | |
val dispExecSrvGetter = { | |
val f = classOf[Dispatcher].getDeclaredMethod("executorService") | |
f.setAccessible(true) | |
f | |
} | |
} | |
def traverseDispatchers(as: ActorSystem)(handler: DispatcherHandler) = { | |
import Reflection._ | |
def traverseDispatcher(dispatcher: MessageDispatcher) = { | |
def extractExecSrv = dispatcher match { | |
// default akka dispatcher | |
case md: Dispatcher => Some( | |
dispExecSrvGetter.invoke(dispatcher) | |
.asInstanceOf[ExecutorServiceDelegate] | |
.executor) | |
case _ => None | |
} | |
handler(dispatcher, extractExecSrv) | |
} | |
import scala.collection.JavaConverters._ | |
val confMap = dispConfiguratorsField.get(as.dispatchers) | |
.asInstanceOf[java.util.concurrent.ConcurrentMap[String, MessageDispatcherConfigurator]] | |
.asScala | |
confMap.values foreach { dispConf => traverseDispatcher(dispConf.dispatcher) } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment