Last active
February 8, 2025 02:36
-
-
Save abelardojarab/362b4e1d7ec70adb75e88f8559d0543a to your computer and use it in GitHub Desktop.
Merge function events in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import multiprocessing | |
from concurrent.futures import ProcessPoolExecutor | |
def process_chunk(chunk, chunk_start_idx): | |
"""Processes a chunk of samples and tracks function lifetimes.""" | |
active_functions = {} # {function_name: start_time} | |
events = [] | |
for offset, functions in enumerate(chunk): | |
timestamp = chunk_start_idx + offset # Adjust timestamp based on chunk index | |
current_functions = set(functions) | |
# Detect new functions (start events) | |
for func in current_functions - active_functions.keys(): | |
active_functions[func] = timestamp | |
# Detect functions that ended (end events) | |
for func in list(active_functions.keys()): | |
if func not in current_functions: | |
events.append((func, active_functions[func], timestamp)) | |
del active_functions[func] | |
# Handle functions still active at the end | |
final_timestamp = chunk_start_idx + len(chunk) | |
for func, start_time in active_functions.items(): | |
events.append((func, start_time, final_timestamp)) | |
return events # Return processed events | |
def merge_events(events): | |
"""Merges overlapping function events across chunks.""" | |
merged_events = {} | |
for func, start, end in sorted(events, key=lambda x: (x[0], x[1])): # Sort by function, then start time | |
if func in merged_events and merged_events[func][-1][1] == start: | |
merged_events[func][-1] = (func, merged_events[func][-1][0], end) # Extend previous event | |
else: | |
merged_events.setdefault(func, []).append((start, end)) | |
return [(func, start, end) for func, intervals in merged_events.items() for start, end in intervals] | |
def parallel_generate_function_events(samples, num_workers=4, chunk_size=100000): | |
"""Parallel processing of large-scale function event tracking with parallel reduce.""" | |
num_samples = len(samples) | |
chunks = [samples[i:i + chunk_size] for i in range(0, num_samples, chunk_size)] | |
chunk_indices = [i for i in range(0, num_samples, chunk_size)] | |
with ProcessPoolExecutor(max_workers=num_workers) as executor: | |
# Process chunks in parallel (Map Phase) | |
results = list(executor.map(process_chunk, chunks, chunk_indices)) | |
# Flatten list of results | |
all_events = [event for result in results for event in result] | |
# Reduce Phase: Merge events in parallel | |
num_reduce_chunks = min(num_workers, len(all_events) // chunk_size) # Avoid too many small reduce jobs | |
if num_reduce_chunks == 0: | |
num_reduce_chunks = 2 | |
num_workers = 2 | |
reduce_chunks = np.array_split(all_events, num_reduce_chunks) | |
with ProcessPoolExecutor(max_workers=num_workers) as executor: | |
reduced_results = list(executor.map(merge_events, reduce_chunks)) | |
# Final merge of reduced results (last reduce step) | |
final_events = merge_events([event for result in reduced_results for event in result]) | |
return final_events | |
# 🔥 Simulating Large-Scale Data | |
import random | |
N_SAMPLES = 100 # 1 million samples | |
FUNCTIONS = ["A", "B", "C", "D", "E", "F", "G", "H"] | |
samples_large = [[random.choice(FUNCTIONS) for _ in range(random.randint(1, 5))] for _ in range(N_SAMPLES)] | |
# 🔥 Run Parallel Processing with Reduce | |
events = parallel_generate_function_events(samples_large, num_workers=multiprocessing.cpu_count()) | |
# 🔥 Save to File (Batch Writing) | |
with open("function_events_parallel.txt", "w") as f: | |
for event in events: | |
print(f"{event[0]},{event[1]},{event[2]}\n") | |
f.write(f"{event[0]},{event[1]},{event[2]}\n") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment