Created
May 3, 2022 09:46
-
-
Save jrudolph/daab67b44318d4394a3d92ad21a18a9e to your computer and use it in GitHub Desktop.
Generate allocation flamegraph from JFR
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# export jfr allocation data | |
jfr print --json --events jdk.ObjectAllocationInNewTLAB xyz.jfr | pigz -9 > allocation.json.gz | |
# after running script above: | |
flamegraph.pl < collapsed-stacks.log > fg.svg |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet, OverflowStrategy } | |
import akka.stream.scaladsl.{ Compression, FileIO, JsonFraming } | |
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } | |
import akka.util.ByteString | |
import java.io.{ File, FileOutputStream } | |
import spray.json._ | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import scala.util.{ Failure, Success, Try } | |
object ObjectTLABProtocol { | |
import spray.json.DefaultJsonProtocol._ | |
case class FrameType( | |
name: String | |
) | |
case class FrameMethod( | |
name: String, | |
`type`: FrameType | |
) | |
case class Frame(method: FrameMethod) | |
case class StackTrace(frames: Seq[Frame]) | |
case class ObjectClass(name: String) | |
case class EventThread(javaName: String) | |
case class ObjectAllocationInNewTLABEvent( | |
eventThread: EventThread, | |
stackTrace: Option[StackTrace], | |
objectClass: ObjectClass, | |
allocationSize: Long, | |
tlabSize: Long | |
) | |
case class Event(`type`: String, values: ObjectAllocationInNewTLABEvent) | |
implicit val frameTypeFormat = jsonFormat1(FrameType.apply _) | |
implicit val frameMethodFormat = jsonFormat2(FrameMethod.apply _) | |
implicit val frameFormat = jsonFormat1(Frame.apply _) | |
implicit val stackTraceFormat = jsonFormat1(StackTrace.apply _) | |
implicit val objectClassFormat = jsonFormat1(ObjectClass.apply _) | |
implicit val eventThreadFormat = jsonFormat1(EventThread.apply _) | |
implicit val objectAllocationFormat = jsonFormat5(ObjectAllocationInNewTLABEvent.apply _) | |
implicit val eventFormat = jsonFormat2(Event.apply _) | |
} | |
class DropBytes(numBytesToDrop: Long) extends GraphStage[FlowShape[ByteString, ByteString]] { | |
val bytesIn = Inlet[ByteString]("dropBytes.bytesIn") | |
val bytesOut = Outlet[ByteString]("dropBytes.bytesOut") | |
override def shape: FlowShape[ByteString, ByteString] = FlowShape(bytesIn, bytesOut) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) { | |
setHandlers(bytesIn, bytesOut, Dropping) | |
object Dropping extends InHandler with OutHandler { | |
var toDrop = numBytesToDrop | |
override def onPush(): Unit = { | |
val bytes = grab(bytesIn) | |
toDrop -= bytes.size | |
if (toDrop <= 0) { | |
setHandlers(bytesIn, bytesOut, Passing) | |
if (toDrop < 0) push(bytesOut, bytes.takeRight(-toDrop.toInt)) | |
} else pull(bytesIn) | |
} | |
override def onPull(): Unit = pull(bytesIn) | |
} | |
object Passing extends InHandler with OutHandler { | |
override def onPush(): Unit = push(bytesOut, grab(bytesIn)) | |
override def onPull(): Unit = pull(bytesIn) | |
} | |
} | |
} | |
class Stats[T](name: String, userUnitName: String, userUnits: T => Long, reportInterval: FiniteDuration = 5.seconds) extends GraphStage[FlowShape[T, T]] { | |
val in = Inlet[T]("stats.in") | |
val out = Outlet[T]("stats.out") | |
override def shape: FlowShape[T, T] = FlowShape(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) with InHandler with OutHandler { | |
setHandlers(in, out, this) | |
var waitingOnPullMicros: Long = 0 | |
var waitingOnPushMicros: Long = 0 | |
var processed: Long = 0 | |
var userUnitsProcessed: Long = 0 | |
var latestTimestamp: Long = System.nanoTime() | |
var nextMessageAt: Deadline = Deadline.now + reportInterval | |
var lastProcessed: Long = 0 | |
var lastUserUnitsProcessed: Long = 0 | |
var lastReportedNanos = latestTimestamp | |
override def onPush(): Unit = { | |
val element = grab(in) | |
val now = System.nanoTime() | |
waitingOnPushMicros += (now - latestTimestamp) / 1000 | |
latestTimestamp = now | |
processed += 1 | |
userUnitsProcessed += userUnits(element) | |
if (nextMessageAt.isOverdue()) report(now) | |
push(out, element) | |
} | |
override def onPull(): Unit = { | |
val now = System.nanoTime() | |
waitingOnPullMicros += (now - latestTimestamp) / 1000 | |
latestTimestamp = now | |
if (nextMessageAt.isOverdue()) report(now) | |
pull(in) | |
} | |
def report(now: Long): Unit = { | |
val elementsPerSecond = (processed - lastProcessed).toFloat * 1000000000 / (now - lastReportedNanos) | |
val userUnitsPerSecond = (userUnitsProcessed - lastUserUnitsProcessed).toFloat * 1000000000 / (now - lastReportedNanos) | |
println(f"[$name%20s] processed: $processed%10d tpt: ${elementsPerSecond}%9.3f elements/s | processed (in total): ${userUnitsProcessed}%10d $userUnitName%-10s tpt: ${userUnitsPerSecond}%10.3f $userUnitName%10s/s | waiting rate: ${waitingOnPushMicros.toFloat / waitingOnPullMicros}%6.2f") | |
nextMessageAt = Deadline.now + reportInterval | |
lastProcessed = processed | |
lastUserUnitsProcessed = userUnitsProcessed | |
lastReportedNanos = now | |
} | |
} | |
} | |
object FoldAllocationsForFlameGraph extends App { | |
import ObjectTLABProtocol._ | |
def frameToStack(frame: Frame): String = | |
s"${frame.method.`type`.name}::${frame.method.name}" | |
def stackOfEvent(event: Event): (String, Long) = { | |
val stack = event.values.stackTrace match { | |
case Some(trace) => s"${trace.frames.reverse.map(frameToStack).mkString(";")};" | |
case None => "" | |
} | |
val pool = { | |
val n = event.values.eventThread.javaName | |
val i = n.lastIndexOf('-') | |
if (i > 0) n.take(i) | |
else n | |
} | |
s"$stack${event.values.objectClass.name};$pool" -> event.values.allocationSize | |
} | |
implicit val system = ActorSystem() | |
import system.dispatcher | |
FileIO.fromPath(new File("../allocation.json.gz").toPath) | |
.via(new Stats("input file", "kB", _.size / 1000)) | |
.via(Compression.gunzip()) | |
.via(new DropBytes(33)) | |
.async | |
.via(new Stats("gunzip", "kB", _.size / 1000)) | |
.async | |
.via(JsonFraming.objectScanner(1000000)) | |
.async | |
.via(new Stats("objectScanner", "kB", _.size / 1000)) | |
.async | |
.grouped(1000) | |
.mapAsync(1024)(bss => | |
Future { | |
bss.flatMap(bs => | |
Try(bs.utf8String.parseJson.convertTo[Event]) match { | |
case Success(s) => Some(s) | |
case Failure(ex) => | |
println(s"Parsing failed: ${ex.getMessage}") | |
println(bs.utf8String) | |
None | |
} | |
) | |
} | |
) | |
.async | |
.via(new Stats("after parsing", "elements", _.size)) | |
.mapConcat(identity) | |
//.take(50000) | |
.statefulMapConcat { () => | |
var i = 0 | |
e => { | |
i += 1 | |
if (i % 10000 == 0) println(s"At $i") | |
e :: Nil | |
} | |
} | |
.map(stackOfEvent) | |
.async | |
.via(new Stats("after stack creation", "events", _ => 1)) | |
.runFold(Map.empty[String, Long].withDefaultValue(0L)) { (map, stackAndAlloc) => | |
val (stack, alloc) = stackAndAlloc | |
map.updated(stack, map(stack) + alloc) | |
} | |
.onComplete { | |
case Success(s) => | |
s.toVector.sortBy(-_._2).take(10).foreach { | |
case (stack, alloc) => println(f"$alloc%10d $stack") | |
} | |
val fos = new FileOutputStream("collapsed-stacks.log") | |
s.foreach { | |
case (stack, alloc) => | |
fos.write(s"$stack $alloc\n".getBytes) | |
} | |
fos.close() | |
system.terminate() | |
case Failure(ex) => | |
ex.printStackTrace() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment