Skip to content

Instantly share code, notes, and snippets.

@abelardojarab
Last active February 8, 2025 02:36
Show Gist options
  • Save abelardojarab/362b4e1d7ec70adb75e88f8559d0543a to your computer and use it in GitHub Desktop.
Save abelardojarab/362b4e1d7ec70adb75e88f8559d0543a to your computer and use it in GitHub Desktop.
Merge function events in Python
import numpy as np
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
def process_chunk(chunk, chunk_start_idx):
"""Processes a chunk of samples and tracks function lifetimes."""
active_functions = {} # {function_name: start_time}
events = []
for offset, functions in enumerate(chunk):
timestamp = chunk_start_idx + offset # Adjust timestamp based on chunk index
current_functions = set(functions)
# Detect new functions (start events)
for func in current_functions - active_functions.keys():
active_functions[func] = timestamp
# Detect functions that ended (end events)
for func in list(active_functions.keys()):
if func not in current_functions:
events.append((func, active_functions[func], timestamp))
del active_functions[func]
# Handle functions still active at the end
final_timestamp = chunk_start_idx + len(chunk)
for func, start_time in active_functions.items():
events.append((func, start_time, final_timestamp))
return events # Return processed events
def merge_events(events):
"""Merges overlapping function events across chunks."""
merged_events = {}
for func, start, end in sorted(events, key=lambda x: (x[0], x[1])): # Sort by function, then start time
if func in merged_events and merged_events[func][-1][1] == start:
merged_events[func][-1] = (func, merged_events[func][-1][0], end) # Extend previous event
else:
merged_events.setdefault(func, []).append((start, end))
return [(func, start, end) for func, intervals in merged_events.items() for start, end in intervals]
def parallel_generate_function_events(samples, num_workers=4, chunk_size=100000):
"""Parallel processing of large-scale function event tracking with parallel reduce."""
num_samples = len(samples)
chunks = [samples[i:i + chunk_size] for i in range(0, num_samples, chunk_size)]
chunk_indices = [i for i in range(0, num_samples, chunk_size)]
with ProcessPoolExecutor(max_workers=num_workers) as executor:
# Process chunks in parallel (Map Phase)
results = list(executor.map(process_chunk, chunks, chunk_indices))
# Flatten list of results
all_events = [event for result in results for event in result]
# Reduce Phase: Merge events in parallel
num_reduce_chunks = min(num_workers, len(all_events) // chunk_size) # Avoid too many small reduce jobs
if num_reduce_chunks == 0:
num_reduce_chunks = 2
num_workers = 2
reduce_chunks = np.array_split(all_events, num_reduce_chunks)
with ProcessPoolExecutor(max_workers=num_workers) as executor:
reduced_results = list(executor.map(merge_events, reduce_chunks))
# Final merge of reduced results (last reduce step)
final_events = merge_events([event for result in reduced_results for event in result])
return final_events
# 🔥 Simulating Large-Scale Data
import random
N_SAMPLES = 100 # 1 million samples
FUNCTIONS = ["A", "B", "C", "D", "E", "F", "G", "H"]
samples_large = [[random.choice(FUNCTIONS) for _ in range(random.randint(1, 5))] for _ in range(N_SAMPLES)]
# 🔥 Run Parallel Processing with Reduce
events = parallel_generate_function_events(samples_large, num_workers=multiprocessing.cpu_count())
# 🔥 Save to File (Batch Writing)
with open("function_events_parallel.txt", "w") as f:
for event in events:
print(f"{event[0]},{event[1]},{event[2]}\n")
f.write(f"{event[0]},{event[1]},{event[2]}\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment