Created
February 9, 2025 16:34
-
-
Save abelardojarab/906702990d4a4a719f4aa733d024d17c to your computer and use it in GitHub Desktop.
Flame graph generation with Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import json | |
import re | |
import graphviz | |
class KernelFlameGraph: | |
def __init__(self): | |
# Nested structure for stack hierarchy (FlameGraph format) | |
self.stack_tree = collections.defaultdict(lambda: [0, collections.defaultdict(dict)]) | |
# Flat dictionary to track accumulated occurrences of each function | |
self.total_function_counts = collections.defaultdict(int) | |
def process_trace(self, trace_lines): | |
""" Parses kernel trace logs with timestamped full stack traces. """ | |
for line in trace_lines: | |
line = line.strip() | |
if not line or line.startswith("#"): | |
continue | |
match = re.match(r"(\d+\.\d+)\s+(.+)", line) | |
if match: | |
timestamp, stack_trace = match.groups() | |
stack_trace = stack_trace.strip().split(";") | |
# Insert into hierarchical stack structure | |
self._insert_stack(stack_trace) | |
def _insert_stack(self, stack_trace): | |
""" Inserts a stack trace into a nested dictionary (tree structure). """ | |
node = self.stack_tree | |
for func in stack_trace: | |
if func not in node: | |
node[func] = [0, collections.defaultdict(dict)] | |
node[func][0] += 1 # Increment function occurrence in this stack path | |
self.total_function_counts[func] += 1 # Global count across all call paths | |
node = node[func][1] # Move to the next level | |
def export_json(self, output_file="flamegraph.json"): | |
""" Exports hierarchical stack data in JSON format for visualization. """ | |
def convert_tree(node): | |
return {func: {"count": count, "children": convert_tree(children)} | |
for func, (count, children) in node.items()} | |
with open(output_file, "w") as f: | |
json.dump(convert_tree(self.stack_tree), f, indent=4) | |
def visualize_tree(self, output_file="stack_tree"): | |
""" Uses Graphviz to generate a visual representation of the call hierarchy. """ | |
dot = graphviz.Digraph(format="png", graph_attr={"rankdir": "TB"}) # Top-to-Bottom | |
def add_nodes_edges(node, parent=None): | |
""" Recursively adds nodes and edges to the graph. """ | |
for func, (count, children) in node.items(): | |
label = f"{func}\\n({count} calls)" | |
dot.node(func, label=label, shape="box", style="filled", fillcolor="lightblue") | |
if parent: | |
dot.edge(parent, func) | |
add_nodes_edges(children, func) | |
add_nodes_edges(self.stack_tree) | |
dot.render(output_file) | |
print(f"Stack tree visualization saved as {output_file}.png") | |
# Example usage | |
if __name__ == "__main__": | |
# Simulated kernel trace with timestamps and full stack traces | |
kernel_trace = [ | |
"1618998023.123 funcA;funcB;funcC", | |
"1618998023.456 funcA;funcB", | |
"1618998023.789 funcA", | |
"1618998024.001 funcD;funcE", | |
"1618998024.123 funcA;funcB;funcC", | |
"1618998024.456 funcA", | |
"1618998025.123 funcX;funcB;funcC", | |
"1618998025.456 funcX;funcB" | |
] | |
fg = KernelFlameGraph() | |
fg.process_trace(kernel_trace) | |
# Export and visualize the stack hierarchy | |
fg.export_json("flamegraph.json") | |
fg.visualize_tree("stack_tree") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment