Created
January 28, 2022 13:52
-
-
Save luhenry/903c8e9bb03aea05df8a12f8564a675d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.file.Paths; | |
import java.util.*; | |
import java.util.zip.GZIPOutputStream; | |
import java.time.*; | |
import java.io.*; | |
import jdk.jfr.*; | |
import jdk.jfr.consumer.*; | |
class IntervalBuilder { | |
public static void main(String[] args) throws Throwable { | |
int nevents = 0; | |
IntervalBuilder ib = new IntervalBuilder(); | |
try (RecordingFile f = new RecordingFile(Paths.get(args[0]))) { | |
while (f.hasMoreEvents()) { | |
RecordedEvent event = f.readEvent(); | |
if (event.getEventType().getName().equals("datadog.Checkpoint")) { | |
ib.consume(event); | |
nevents += 1; | |
} | |
} | |
} | |
System.out.printf("nevents = %d\n", nevents); | |
boolean compress = false; | |
try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream()) { | |
try (OutputStream out = compress ? new GZIPOutputStream(byteStream) : byteStream) { | |
for (Map.Entry<Long, Map<Long, Deque<Interval>>> e1 : ib._intervalsBySpanAndThread.entrySet()) { | |
// System.out.println("spanId: " + e1.getKey()); | |
ib.writeLong(out, e1.getKey()); | |
ib.writeLong(out, e1.getValue().size()); | |
for (Map.Entry<Long, Deque<Interval>> e2 : e1.getValue().entrySet()) { | |
// System.out.println(" threadId: " + e2.getKey()); | |
ib.writeLong(out, e2.getKey()); | |
ib.writeLong(out, e2.getValue().size()); | |
Instant baseTime = null; | |
Interval interval; | |
while ((interval = e2.getValue().pollLast()) != null) { | |
if (interval._state == STATE_INACTIVE || interval._state == STATE_CLOSED) { | |
if (baseTime == null) { | |
// System.out.printf(" [%s->+%s]\n", | |
// (baseTime = interval._start).toString(), | |
// Duration.between(interval._start, interval._end).toString()); | |
ib.writeInstant(out, baseTime = interval._start); | |
ib.writeDuration(out, Duration.between(interval._start, interval._end)); | |
} else { | |
// System.out.printf(" [%s->+%s]\n", | |
// Duration.between(baseTime, interval._start).toString(), | |
// Duration.between(interval._start, interval._end).toString()); | |
ib.writeDuration(out, Duration.between(baseTime, interval._start)); | |
ib.writeDuration(out, Duration.between(interval._start, interval._end)); | |
} | |
} else { | |
// System.out.printf(" [open interval]\n"); | |
} | |
} | |
} | |
} | |
if (out instanceof GZIPOutputStream) { | |
((GZIPOutputStream)out).finish(); | |
} | |
} | |
System.out.printf("size = %d\n", byteStream.toByteArray().length); | |
} | |
} | |
void writeLong(OutputStream s, long v) throws IOException { | |
byte[] bytes = new byte[] { | |
(byte)(v >> 0), | |
(byte)(v >> 8), | |
(byte)(v >> 16), | |
(byte)(v >> 24), | |
(byte)(v >> 32), | |
(byte)(v >> 40), | |
(byte)(v >> 48), | |
(byte)(v >> 56) | |
}; | |
s.write(bytes); | |
} | |
void writeLEB128(OutputStream s, long v) throws IOException { | |
byte[] bytes = new byte[10]; | |
int nbytes = 0; | |
do { | |
bytes[nbytes++] = (byte)(v & ((1 << 7) - 1)); | |
} while ((v >>>= 7) != 0); | |
s.write(bytes, 0, nbytes); | |
} | |
void writeInstant(OutputStream s, Instant v) throws IOException { | |
writeLEB128(s, v.getEpochSecond() * 1000 * 1000 * 1000 + v.getNano()); | |
} | |
void writeDuration(OutputStream s, Duration v) throws IOException { | |
writeLEB128(s, v.toNanos()); | |
} | |
final int END = 0x1; | |
final int CPU = 0x2; | |
final int SPAN = 0x4 | CPU; | |
final int THREAD_MIGRATION = 0x8 | CPU; | |
final int MASK = ~CPU; | |
void consume(RecordedEvent e) { | |
assert e.getEventType().getName().equals("datadog.Checkpoint"); | |
long threadId = e.getThread().getId(); | |
Instant tick = e.getStartTime(); | |
long localRootSpanId = e.getLong("localRootSpanId"); | |
long spanId = e.getLong("spanId"); | |
int flags = e.getInt("flags"); | |
// System.out.println("localRootSpanId = " + localRootSpanId + " spanId = " + spanId + " flags = " + flags); | |
switch (flags) { | |
case (SPAN) & MASK: | |
onStart(localRootSpanId, spanId, threadId, tick); | |
break; | |
case (SPAN | END) & MASK: | |
onFinish(localRootSpanId, spanId, threadId, tick); | |
break; | |
case (CPU) & MASK: | |
onStartWork(localRootSpanId, spanId, threadId, tick); | |
break; | |
case (CPU | END) & MASK: | |
onFinishWork(localRootSpanId, spanId, threadId, tick); | |
break; | |
case (THREAD_MIGRATION) & MASK: | |
onStartThreadMigration(localRootSpanId, spanId, threadId, tick); | |
break; | |
case (THREAD_MIGRATION | END) & MASK: | |
onFinishThreadMigration(localRootSpanId, spanId, threadId, tick); | |
break; | |
default: | |
throw new RuntimeException("unknown flags " + flags); | |
} | |
} | |
static final int STATE_ACTIVE = 1; | |
static final int STATE_INACTIVE = 2; | |
static final int STATE_CLOSED = 3; | |
static class Interval { | |
public Instant _start; | |
public Instant _end; | |
public int _state; | |
} | |
Map<Long/*spanId*/, Map<Long/*threadId*/, Deque<Interval>>> _intervalsBySpanAndThread = new HashMap<Long, Map<Long, Deque<Interval>>>(); | |
void onStart(long localRootSpanId, long spanId, long threadId, Instant tick) { | |
// System.out.println("startSpan: spanId = " + spanId + " threadId = " + threadId); | |
Deque<Interval> intervals = | |
_intervalsBySpanAndThread.computeIfAbsent(spanId, k -> new HashMap<Long, Deque<Interval>>()) | |
.computeIfAbsent(threadId, k -> new ArrayDeque<Interval>()); | |
if (!intervals.isEmpty()) { | |
Interval head = intervals.peek(); | |
switch (head._state) { | |
case STATE_ACTIVE: | |
throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: there is already an open interval"); | |
case STATE_INACTIVE: | |
head._state = STATE_CLOSED; | |
break; | |
case STATE_CLOSED: | |
break; | |
} | |
} | |
Interval interval = new Interval(); | |
interval._start = tick; | |
interval._state = STATE_ACTIVE; | |
intervals.push(interval); | |
} | |
void onFinish(long localRootSpanId, long spanId, long threadId, Instant tick) { | |
// System.out.println("endSpan : spanId = " + spanId + " threadId = " + threadId); | |
if (!_intervalsBySpanAndThread.containsKey(spanId) | |
|| !_intervalsBySpanAndThread.get(spanId).containsKey(threadId) | |
|| _intervalsBySpanAndThread.get(spanId).get(threadId).isEmpty()) { | |
// throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: there is no open interval"); | |
return; | |
} | |
Interval interval = _intervalsBySpanAndThread.get(spanId).get(threadId).peek(); | |
interval._state = STATE_CLOSED; | |
interval._end = tick; | |
} | |
void onStartWork(long localRootSpanId, long spanId, long threadId, Instant tick) { | |
// System.out.println("startTask: spanId = " + spanId + " threadId = " + threadId); | |
throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: this should never be called"); | |
} | |
void onFinishWork(long localRootSpanId, long spanId, long threadId, Instant tick) { | |
// System.out.println("endTask : spanId = " + spanId + " threadId = " + threadId); | |
if (!_intervalsBySpanAndThread.containsKey(spanId) | |
|| !_intervalsBySpanAndThread.get(spanId).containsKey(threadId) | |
|| _intervalsBySpanAndThread.get(spanId).get(threadId).isEmpty()) { | |
// throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: there is no open interval"); | |
return; | |
} | |
Interval interval = _intervalsBySpanAndThread.get(spanId).get(threadId).peek(); | |
interval._state = STATE_CLOSED; | |
interval._end = tick; | |
} | |
void onStartThreadMigration(long localRootSpanId, long spanId, long threadId, Instant tick) { | |
// System.out.println("suspend : spanId = " + spanId + " threadId = " + threadId); | |
if (!_intervalsBySpanAndThread.containsKey(spanId) | |
|| !_intervalsBySpanAndThread.get(spanId).containsKey(threadId) | |
|| _intervalsBySpanAndThread.get(spanId).get(threadId).isEmpty()) { | |
// throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: there is no open interval"); | |
return; | |
} | |
Interval interval = _intervalsBySpanAndThread.get(spanId).get(threadId).peek(); | |
interval._state = STATE_INACTIVE; | |
interval._end = tick; | |
} | |
void onFinishThreadMigration(long localRootSpanId, long spanId, long threadId, Instant tick) { | |
// System.out.println("resume : spanId = " + spanId + " threadId = " + threadId); | |
Deque<Interval> intervals = | |
_intervalsBySpanAndThread.computeIfAbsent(spanId, k -> new HashMap<Long, Deque<Interval>>()) | |
.computeIfAbsent(threadId, k -> new ArrayDeque<Interval>()); | |
if (!intervals.isEmpty()) { | |
Interval head = intervals.peek(); | |
switch (head._state) { | |
case STATE_ACTIVE: | |
// throw new RuntimeException("[spanId=" + spanId + ",threadId=" + threadId + "]: there is already an open interval"); | |
return; | |
case STATE_INACTIVE: | |
head._state = STATE_CLOSED; | |
break; | |
case STATE_CLOSED: | |
break; | |
} | |
} | |
Interval interval = new Interval(); | |
interval._state = STATE_ACTIVE; | |
interval._start = tick; | |
intervals.push(interval); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment