Skip to content

Instantly share code, notes, and snippets.

@luhenry
Created January 28, 2022 13:52
Show Gist options
  • Save luhenry/903c8e9bb03aea05df8a12f8564a675d to your computer and use it in GitHub Desktop.
Save luhenry/903c8e9bb03aea05df8a12f8564a675d to your computer and use it in GitHub Desktop.
import java.nio.file.Paths;
import java.util.*;
import java.util.zip.GZIPOutputStream;
import java.time.*;
import java.io.*;
import jdk.jfr.*;
import jdk.jfr.consumer.*;
class IntervalBuilder {
public static void main(String[] args) throws Throwable {
int nevents = 0;
IntervalBuilder ib = new IntervalBuilder();
try (RecordingFile f = new RecordingFile(Paths.get(args[0]))) {
while (f.hasMoreEvents()) {
RecordedEvent event = f.readEvent();
if (event.getEventType().getName().equals("datadog.Checkpoint")) {
ib.consume(event);
nevents += 1;
}
}
}
System.out.printf("nevents = %d\n", nevents);
boolean compress = false;
try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream()) {
try (OutputStream out = compress ? new GZIPOutputStream(byteStream) : byteStream) {
for (Map.Entry<Long, Map<Long, Deque<Interval>>> e1 : ib._intervalsBySpanAndThread.entrySet()) {
// System.out.println("spanId: " + e1.getKey());
ib.writeLong(out, e1.getKey());
ib.writeLong(out, e1.getValue().size());
for (Map.Entry<Long, Deque<Interval>> e2 : e1.getValue().entrySet()) {
// System.out.println(" threadId: " + e2.getKey());
ib.writeLong(out, e2.getKey());
ib.writeLong(out, e2.getValue().size());
Instant baseTime = null;
Interval interval;
while ((interval = e2.getValue().pollLast()) != null) {
if (interval._state == STATE_INACTIVE || interval._state == STATE_CLOSED) {
if (baseTime == null) {
// System.out.printf(" [%s->+%s]\n",
// (baseTime = interval._start).toString(),
// Duration.between(interval._start, interval._end).toString());
ib.writeInstant(out, baseTime = interval._start);
ib.writeDuration(out, Duration.between(interval._start, interval._end));
} else {
// System.out.printf(" [%s->+%s]\n",
// Duration.between(baseTime, interval._start).toString(),
// Duration.between(interval._start, interval._end).toString());
ib.writeDuration(out, Duration.between(baseTime, interval._start));
ib.writeDuration(out, Duration.between(interval._start, interval._end));
}
} else {
// System.out.printf(" [open interval]\n");
}
}
}
}
if (out instanceof GZIPOutputStream) {
((GZIPOutputStream)out).finish();
}
}
System.out.printf("size = %d\n", byteStream.toByteArray().length);
}
}
void writeLong(OutputStream s, long v) throws IOException {
byte[] bytes = new byte[] {
(byte)(v >> 0),
(byte)(v >> 8),
(byte)(v >> 16),
(byte)(v >> 24),
(byte)(v >> 32),
(byte)(v >> 40),
(byte)(v >> 48),
(byte)(v >> 56)
};
s.write(bytes);
}
void writeLEB128(OutputStream s, long v) throws IOException {
byte[] bytes = new byte[10];
int nbytes = 0;
do {
bytes[nbytes++] = (byte)(v & ((1 << 7) - 1));
} while ((v >>>= 7) != 0);
s.write(bytes, 0, nbytes);
}
void writeInstant(OutputStream s, Instant v) throws IOException {
writeLEB128(s, v.getEpochSecond() * 1000 * 1000 * 1000 + v.getNano());
}
void writeDuration(OutputStream s, Duration v) throws IOException {
writeLEB128(s, v.toNanos());
}
final int END = 0x1;
final int CPU = 0x2;
final int SPAN = 0x4 | CPU;
final int THREAD_MIGRATION = 0x8 | CPU;
final int MASK = ~CPU;
void consume(RecordedEvent e) {
assert e.getEventType().getName().equals("datadog.Checkpoint");
long threadId = e.getThread().getId();
Instant tick = e.getStartTime();
long localRootSpanId = e.getLong("localRootSpanId");
long spanId = e.getLong("spanId");
int flags = e.getInt("flags");
// System.out.println("localRootSpanId = " + localRootSpanId + " spanId = " + spanId + " flags = " + flags);
switch (flags) {
case (SPAN) & MASK:
onStart(localRootSpanId, spanId, threadId, tick);
break;
case (SPAN | END) & MASK:
onFinish(localRootSpanId, spanId, threadId, tick);
break;
case (CPU) & MASK:
onStartWork(localRootSpanId, spanId, threadId, tick);
break;
case (CPU | END) & MASK:
onFinishWork(localRootSpanId, spanId, threadId, tick);
break;
case (THREAD_MIGRATION) & MASK:
onStartThreadMigration(localRootSpanId, spanId, threadId, tick);
break;
case (THREAD_MIGRATION | END) & MASK:
onFinishThreadMigration(localRootSpanId, spanId, threadId, tick);
break;
default:
throw new RuntimeException("unknown flags " + flags);
}
}
static final int STATE_ACTIVE = 1;
static final int STATE_INACTIVE = 2;
static final int STATE_CLOSED = 3;
static class Interval {
public Instant _start;
public Instant _end;
public int _state;
}
Map<Long/*spanId*/, Map<Long/*threadId*/, Deque<Interval>>> _intervalsBySpanAndThread = new HashMap<Long, Map<Long, Deque<Interval>>>();
void onStart(long localRootSpanId, long spanId, long threadId, Instant tick) {
// System.out.println("startSpan: spanId = " + spanId + " threadId = " + threadId);
Deque<Interval> intervals =
_intervalsBySpanAndThread.computeIfAbsent(spanId, k -> new HashMap<Long, Deque<Interval>>())
.computeIfAbsent(threadId, k -> new ArrayDeque<Interval>());
if (!intervals.isEmpty()) {
Interval head = intervals.peek();
switch (head._state) {
case STATE_ACTIVE:
throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: there is already an open interval");
case STATE_INACTIVE:
head._state = STATE_CLOSED;
break;
case STATE_CLOSED:
break;
}
}
Interval interval = new Interval();
interval._start = tick;
interval._state = STATE_ACTIVE;
intervals.push(interval);
}
void onFinish(long localRootSpanId, long spanId, long threadId, Instant tick) {
// System.out.println("endSpan : spanId = " + spanId + " threadId = " + threadId);
if (!_intervalsBySpanAndThread.containsKey(spanId)
|| !_intervalsBySpanAndThread.get(spanId).containsKey(threadId)
|| _intervalsBySpanAndThread.get(spanId).get(threadId).isEmpty()) {
// throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: there is no open interval");
return;
}
Interval interval = _intervalsBySpanAndThread.get(spanId).get(threadId).peek();
interval._state = STATE_CLOSED;
interval._end = tick;
}
void onStartWork(long localRootSpanId, long spanId, long threadId, Instant tick) {
// System.out.println("startTask: spanId = " + spanId + " threadId = " + threadId);
throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: this should never be called");
}
void onFinishWork(long localRootSpanId, long spanId, long threadId, Instant tick) {
// System.out.println("endTask : spanId = " + spanId + " threadId = " + threadId);
if (!_intervalsBySpanAndThread.containsKey(spanId)
|| !_intervalsBySpanAndThread.get(spanId).containsKey(threadId)
|| _intervalsBySpanAndThread.get(spanId).get(threadId).isEmpty()) {
// throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: there is no open interval");
return;
}
Interval interval = _intervalsBySpanAndThread.get(spanId).get(threadId).peek();
interval._state = STATE_CLOSED;
interval._end = tick;
}
void onStartThreadMigration(long localRootSpanId, long spanId, long threadId, Instant tick) {
// System.out.println("suspend : spanId = " + spanId + " threadId = " + threadId);
if (!_intervalsBySpanAndThread.containsKey(spanId)
|| !_intervalsBySpanAndThread.get(spanId).containsKey(threadId)
|| _intervalsBySpanAndThread.get(spanId).get(threadId).isEmpty()) {
// throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: there is no open interval");
return;
}
Interval interval = _intervalsBySpanAndThread.get(spanId).get(threadId).peek();
interval._state = STATE_INACTIVE;
interval._end = tick;
}
void onFinishThreadMigration(long localRootSpanId, long spanId, long threadId, Instant tick) {
// System.out.println("resume : spanId = " + spanId + " threadId = " + threadId);
Deque<Interval> intervals =
_intervalsBySpanAndThread.computeIfAbsent(spanId, k -> new HashMap<Long, Deque<Interval>>())
.computeIfAbsent(threadId, k -> new ArrayDeque<Interval>());
if (!intervals.isEmpty()) {
Interval head = intervals.peek();
switch (head._state) {
case STATE_ACTIVE:
// throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: there is already an open interval");
return;
case STATE_INACTIVE:
head._state = STATE_CLOSED;
break;
case STATE_CLOSED:
break;
}
}
Interval interval = new Interval();
interval._state = STATE_ACTIVE;
interval._start = tick;
intervals.push(interval);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment