Skip to content

Instantly share code, notes, and snippets.

@abelardojarab
Last active February 9, 2025 19:49
Show Gist options
  • Save abelardojarab/9e91fafe8a74f82fbdc06decde523942 to your computer and use it in GitHub Desktop.
Save abelardojarab/9e91fafe8a74f82fbdc06decde523942 to your computer and use it in GitHub Desktop.
Parallel Flame Graph Generation using Multiprocessing
import collections
import json
import re
import concurrent.futures
def _create_default_tree():
return [0, collections.defaultdict(dict)]
def process_trace_chunk(trace_chunk):
"""Processes a subset of kernel traces and returns a local stack tree."""
local_stack_tree = collections.defaultdict(_create_default_tree)
total_function_counts = collections.defaultdict(int)
for line in trace_chunk:
line = line.strip()
if not line or line.startswith("#"):
continue
match = re.match(r"(\d+\.\d+)\s+(.+)", line)
if match:
_, stack_trace = match.groups()
stack_trace = stack_trace.strip().split(";")
_insert_stack(stack_trace, local_stack_tree, total_function_counts)
return dict(local_stack_tree), dict(total_function_counts)
def _insert_stack(stack_trace, stack_tree, total_counts):
"""Inserts a stack trace into a nested dictionary (tree structure)."""
node = stack_tree
for func in stack_trace:
if func not in node:
node[func] = _create_default_tree()
node[func][0] += 1
total_counts[func] += 1
node = node[func][1] # Move deeper
def merge_trees(global_tree, local_tree):
"""Merges a local stack tree into the global tree."""
for func, (count, children) in local_tree.items():
if func not in global_tree:
global_tree[func] = _create_default_tree()
global_tree[func][0] += count
merge_trees(global_tree[func][1], children)
class ParallelKernelFlameGraphExecutor:
def __init__(self, num_workers=None):
self.num_workers = num_workers or 4 # Default to 4 workers
self.global_stack_tree = collections.defaultdict(_create_default_tree)
self.total_function_counts = collections.defaultdict(int)
def process_traces_parallel(self, trace_lines):
"""Splits traces and processes them in parallel using ProcessPoolExecutor."""
chunk_size = max(1, len(trace_lines) // self.num_workers)
trace_chunks = [trace_lines[i:i + chunk_size] for i in range(0, len(trace_lines), chunk_size)]
with concurrent.futures.ProcessPoolExecutor(max_workers=self.num_workers) as executor:
future_to_chunk = {executor.submit(process_trace_chunk, chunk): chunk for chunk in trace_chunks}
for future in concurrent.futures.as_completed(future_to_chunk):
local_tree, local_counts = future.result()
merge_trees(self.global_stack_tree, local_tree)
for func, count in local_counts.items():
self.total_function_counts[func] += count
def print_stack_tree(self):
"""Pretty prints the stack tree as JSON."""
def convert_tree(node):
return {func: {"count": count, "children": convert_tree(children)} for func, (count, children) in node.items()}
print(json.dumps(convert_tree(self.global_stack_tree), indent=4))
def report_most_frequent_function(self):
"""Reports the function with the highest occurrence."""
if not self.total_function_counts:
print("No function calls recorded.")
return
most_frequent_func = max(self.total_function_counts, key=self.total_function_counts.get)
print(f"Most frequently called function: {most_frequent_func} ({self.total_function_counts[most_frequent_func]} times)")
# Simulated kernel trace with timestamps and full stack traces
kernel_trace = [
"1618998023.123 funcA;funcB;funcC",
"1618998023.456 funcA;funcB",
"1618998023.789 funcA",
"1618998024.001 funcD;funcE",
"1618998024.123 funcA;funcB;funcC",
"1618998024.456 funcA",
"1618998025.123 funcX;funcB;funcC",
"1618998025.456 funcX;funcB"
]
# Run parallel processing using ProcessPoolExecutor
flame_graph = ParallelKernelFlameGraphExecutor(num_workers=4)
flame_graph.process_traces_parallel(kernel_trace)
# Pretty print the stack tree as JSON
flame_graph.print_stack_tree()
# Report the most frequently called function
flame_graph.report_most_frequent_function()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment