Last active
February 9, 2025 19:49
-
-
Save abelardojarab/9e91fafe8a74f82fbdc06decde523942 to your computer and use it in GitHub Desktop.
Parallel Flame Graph Generation using Multiprocessing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import json | |
import re | |
import concurrent.futures | |
def _create_default_tree(): | |
return [0, collections.defaultdict(dict)] | |
def process_trace_chunk(trace_chunk): | |
"""Processes a subset of kernel traces and returns a local stack tree.""" | |
local_stack_tree = collections.defaultdict(_create_default_tree) | |
total_function_counts = collections.defaultdict(int) | |
for line in trace_chunk: | |
line = line.strip() | |
if not line or line.startswith("#"): | |
continue | |
match = re.match(r"(\d+\.\d+)\s+(.+)", line) | |
if match: | |
_, stack_trace = match.groups() | |
stack_trace = stack_trace.strip().split(";") | |
_insert_stack(stack_trace, local_stack_tree, total_function_counts) | |
return dict(local_stack_tree), dict(total_function_counts) | |
def _insert_stack(stack_trace, stack_tree, total_counts): | |
"""Inserts a stack trace into a nested dictionary (tree structure).""" | |
node = stack_tree | |
for func in stack_trace: | |
if func not in node: | |
node[func] = _create_default_tree() | |
node[func][0] += 1 | |
total_counts[func] += 1 | |
node = node[func][1] # Move deeper | |
def merge_trees(global_tree, local_tree): | |
"""Merges a local stack tree into the global tree.""" | |
for func, (count, children) in local_tree.items(): | |
if func not in global_tree: | |
global_tree[func] = _create_default_tree() | |
global_tree[func][0] += count | |
merge_trees(global_tree[func][1], children) | |
class ParallelKernelFlameGraphExecutor: | |
def __init__(self, num_workers=None): | |
self.num_workers = num_workers or 4 # Default to 4 workers | |
self.global_stack_tree = collections.defaultdict(_create_default_tree) | |
self.total_function_counts = collections.defaultdict(int) | |
def process_traces_parallel(self, trace_lines): | |
"""Splits traces and processes them in parallel using ProcessPoolExecutor.""" | |
chunk_size = max(1, len(trace_lines) // self.num_workers) | |
trace_chunks = [trace_lines[i:i + chunk_size] for i in range(0, len(trace_lines), chunk_size)] | |
with concurrent.futures.ProcessPoolExecutor(max_workers=self.num_workers) as executor: | |
future_to_chunk = {executor.submit(process_trace_chunk, chunk): chunk for chunk in trace_chunks} | |
for future in concurrent.futures.as_completed(future_to_chunk): | |
local_tree, local_counts = future.result() | |
merge_trees(self.global_stack_tree, local_tree) | |
for func, count in local_counts.items(): | |
self.total_function_counts[func] += count | |
def print_stack_tree(self): | |
"""Pretty prints the stack tree as JSON.""" | |
def convert_tree(node): | |
return {func: {"count": count, "children": convert_tree(children)} for func, (count, children) in node.items()} | |
print(json.dumps(convert_tree(self.global_stack_tree), indent=4)) | |
def report_most_frequent_function(self): | |
"""Reports the function with the highest occurrence.""" | |
if not self.total_function_counts: | |
print("No function calls recorded.") | |
return | |
most_frequent_func = max(self.total_function_counts, key=self.total_function_counts.get) | |
print(f"Most frequently called function: {most_frequent_func} ({self.total_function_counts[most_frequent_func]} times)") | |
# Simulated kernel trace with timestamps and full stack traces | |
kernel_trace = [ | |
"1618998023.123 funcA;funcB;funcC", | |
"1618998023.456 funcA;funcB", | |
"1618998023.789 funcA", | |
"1618998024.001 funcD;funcE", | |
"1618998024.123 funcA;funcB;funcC", | |
"1618998024.456 funcA", | |
"1618998025.123 funcX;funcB;funcC", | |
"1618998025.456 funcX;funcB" | |
] | |
# Run parallel processing using ProcessPoolExecutor | |
flame_graph = ParallelKernelFlameGraphExecutor(num_workers=4) | |
flame_graph.process_traces_parallel(kernel_trace) | |
# Pretty print the stack tree as JSON | |
flame_graph.print_stack_tree() | |
# Report the most frequently called function | |
flame_graph.report_most_frequent_function() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment