Last active
February 10, 2025 01:04
-
-
Save abelardojarab/15fd2acf86adc05502cd431ee936d8a2 to your computer and use it in GitHub Desktop.
Generic parallel application using multiprocessing in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import multiprocessing.shared_memory as shm | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
import ray | |
import contextlib | |
class ParallelArrayProcessor: | |
def __init__(self, data, num_chunks=8, use_shared_memory=False): | |
""" | |
Initialize processor. | |
:param data: NumPy array of floats. | |
:param num_chunks: Number of chunks for parallel processing. | |
:param use_shared_memory: Use shared memory to avoid IPC overhead. | |
""" | |
self.use_shared_memory = use_shared_memory | |
self.num_chunks = num_chunks | |
if use_shared_memory: | |
# Create a uniquely named shared memory block | |
self.shared_mem_name = f"shared_memory_{np.random.randint(100000)}" | |
# Allocate shared memory and store the name | |
self.shared_mem = shm.SharedMemory(create=True, size=data.nbytes, name=self.shared_mem_name) | |
self.data = np.ndarray(data.shape, dtype=data.dtype, buffer=self.shared_mem.buf) | |
self.data[:] = data # Copy data into shared memory | |
self.input_slices = np.array_split(np.arange(len(self.data)), num_chunks) # Use index slices | |
else: | |
self.data = data # Use normal NumPy array | |
self.input_slices = np.array_split(self.data, num_chunks) # Directly split NumPy data | |
def _max_min(self, arr): | |
"""Compute max and min on a given slice.""" | |
return np.max(arr), np.min(arr) | |
def _sum(self, arr): | |
"""Compute sum on a given slice.""" | |
return np.sum(arr) | |
def _create_tasks(self): | |
"""Create tasks for both max_min and sum while keeping the original dictionary structure.""" | |
tasks = [] | |
for i, slice_or_indices in enumerate(self.input_slices): | |
shared_mem_name = self.shared_mem_name if self.use_shared_memory else None # Pass shared_mem_name | |
tasks.append({ | |
"index": i, | |
"operation": "max_min", | |
"func": self._max_min, | |
"args": (slice_or_indices, self.use_shared_memory, shared_mem_name) | |
}) | |
tasks.append({ | |
"index": i, | |
"operation": "sum", | |
"func": self._sum, | |
"args": (slice_or_indices, self.use_shared_memory, shared_mem_name) | |
}) | |
return tasks | |
def _merge_results(self, results): | |
"""Final direct reduction for <= 50 elements.""" | |
max_values, min_values, sum_values = [], [], [] | |
for _, res in results: | |
if isinstance(res, tuple): # This must be (max, min) | |
max_values.append(res[0]) # Max | |
min_values.append(res[1]) # Min | |
else: # Sum is stored as a single value | |
sum_values.append(res) | |
# Debugging: Print values before computing final result | |
print(f"DEBUG - max_values: {max_values}, min_values: {min_values}, sum_values: {sum_values}") | |
# Ensure each value is correctly computed and avoid empty lists | |
max_result = np.max(max_values) if max_values else float("-inf") | |
min_result = np.min(min_values) if min_values else float("inf") | |
sum_result = np.sum(sum_values) if sum_values else 0.0 | |
return max_result, min_result, sum_result | |
def _binary_reduce(self, data, reduce_func): | |
"""Parallel binary reduction that preserves index information and handles non-power-of-two cases.""" | |
if not data: | |
return None | |
if len(data) == 1: | |
return data[0][1] # Extract the value from (index, result) | |
while len(data) > 1: | |
new_data = [] | |
with ProcessPoolExecutor(max_workers=min(len(data) // 2, self.num_chunks)) as executor: | |
futures = {} | |
for i in range(0, len(data) - 1, 2): | |
idx1, val1 = data[i] | |
idx2, val2 = data[i + 1] | |
future = executor.submit(reduce_func, val1, val2) | |
futures[future] = (idx1, idx2) # Track indices | |
# Collect results, keeping the index of the first element | |
for future in as_completed(futures): | |
idx1, _ = futures[future] | |
new_data.append((idx1, future.result())) | |
# If odd number of elements, carry forward the last vector properly | |
if len(data) % 2 == 1: | |
new_data.append(data[-1]) # Preserve the last (index, value) | |
data = new_data # Continue reduction with the new reduced set | |
print(data) | |
return data[0][1] # Final reduced value | |
@staticmethod | |
def worker_function(task): | |
"""Worker function that correctly accesses shared memory or NumPy slices.""" | |
op_name, func, args = task["operation"], task["func"], task["args"] | |
arr_or_indices, use_shared, shared_mem_name = args # Pass shared_mem_name | |
if use_shared: | |
# Child process must reopen shared memory by name | |
shared_mem = shm.SharedMemory(name=shared_mem_name) | |
shared_array = np.ndarray(arr_or_indices.shape, dtype=np.float64, buffer=shared_mem.buf) | |
arr = shared_array[arr_or_indices] # Fetch only the required indices | |
else: | |
arr = arr_or_indices # Directly use NumPy slice | |
return task["index"], op_name, func(arr) # Return index for tracking | |
def run_executor(self): | |
"""Run computation using ProcessPoolExecutor while preserving task structure.""" | |
tasks = self._create_tasks() | |
results = [] | |
with ProcessPoolExecutor(max_workers=self.num_chunks) as executor: | |
future_to_task = {executor.submit(self.worker_function, task): task for task in tasks} | |
for future in as_completed(future_to_task): | |
try: | |
index, op_name, res = future.result() | |
results.append((index, op_name, res)) # Store index for tracking | |
except Exception as e: | |
print(f"Task {future_to_task[future]['index']} failed: {e}") | |
# Group results by operation | |
max_values, min_values, sum_values = [], [], [] | |
for index, op, res in results: | |
if op == "max_min": | |
max_values.append((index, res[0])) | |
min_values.append((index, res[1])) | |
elif op == "sum": | |
sum_values.append((index, res)) | |
# Perform binary reduction | |
max_result = self._binary_reduce(max_values, max) | |
min_result = self._binary_reduce(min_values, min) | |
sum_result = self._binary_reduce(sum_values, np.add) | |
return max_result, min_result, sum_result | |
def compute(self, backend="executor"): | |
""" | |
Compute max, min, sum using the specified backend. | |
:param backend: "executor" for ProcessPoolExecutor, "ray" for Ray. | |
:return: Final max, min, sum values. | |
""" | |
return self.run_executor() | |
def cleanup(self): | |
"""Explicitly clean up shared memory after processing.""" | |
if self.use_shared_memory and hasattr(self, "shared_mem"): | |
self.shared_mem.close() | |
self.shared_mem.unlink() # Ensure cleanup | |
# Example Usage | |
if __name__ == "__main__": | |
data = np.random.rand(1_000_000) # 1 million random floats | |
# Run with shared memory enabled | |
processor = ParallelArrayProcessor(data, use_shared_memory=False) | |
max_val, min_val, sum_val = processor.compute(backend="executor") | |
print(f"ProcessPoolExecutor - Max: {max_val}, Min: {min_val}, Sum: {sum_val}") | |
# max_val, min_val, sum_val = processor.compute(backend="ray") | |
# print(f"Ray - Max: {max_val}, Min: {min_val}, Sum: {sum_val}") | |
processor.cleanup() # Free shared memory |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment