Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save abelardojarab/15fd2acf86adc05502cd431ee936d8a2 to your computer and use it in GitHub Desktop.
Save abelardojarab/15fd2acf86adc05502cd431ee936d8a2 to your computer and use it in GitHub Desktop.
Generic parallel application using multiprocessing in Python
import numpy as np
import multiprocessing.shared_memory as shm
from concurrent.futures import ProcessPoolExecutor, as_completed
import ray
import contextlib
class ParallelArrayProcessor:
def __init__(self, data, num_chunks=8, use_shared_memory=False):
"""
Initialize processor.
:param data: NumPy array of floats.
:param num_chunks: Number of chunks for parallel processing.
:param use_shared_memory: Use shared memory to avoid IPC overhead.
"""
self.use_shared_memory = use_shared_memory
self.num_chunks = num_chunks
if use_shared_memory:
# Create a uniquely named shared memory block
self.shared_mem_name = f"shared_memory_{np.random.randint(100000)}"
# Allocate shared memory and store the name
self.shared_mem = shm.SharedMemory(create=True, size=data.nbytes, name=self.shared_mem_name)
self.data = np.ndarray(data.shape, dtype=data.dtype, buffer=self.shared_mem.buf)
self.data[:] = data # Copy data into shared memory
self.input_slices = np.array_split(np.arange(len(self.data)), num_chunks) # Use index slices
else:
self.data = data # Use normal NumPy array
self.input_slices = np.array_split(self.data, num_chunks) # Directly split NumPy data
def _max_min(self, arr):
"""Compute max and min on a given slice."""
return np.max(arr), np.min(arr)
def _sum(self, arr):
"""Compute sum on a given slice."""
return np.sum(arr)
def _create_tasks(self):
"""Create tasks for both max_min and sum while keeping the original dictionary structure."""
tasks = []
for i, slice_or_indices in enumerate(self.input_slices):
shared_mem_name = self.shared_mem_name if self.use_shared_memory else None # Pass shared_mem_name
tasks.append({
"index": i,
"operation": "max_min",
"func": self._max_min,
"args": (slice_or_indices, self.use_shared_memory, shared_mem_name)
})
tasks.append({
"index": i,
"operation": "sum",
"func": self._sum,
"args": (slice_or_indices, self.use_shared_memory, shared_mem_name)
})
return tasks
def _merge_results(self, results):
"""Final direct reduction for <= 50 elements."""
max_values, min_values, sum_values = [], [], []
for _, res in results:
if isinstance(res, tuple): # This must be (max, min)
max_values.append(res[0]) # Max
min_values.append(res[1]) # Min
else: # Sum is stored as a single value
sum_values.append(res)
# Debugging: Print values before computing final result
print(f"DEBUG - max_values: {max_values}, min_values: {min_values}, sum_values: {sum_values}")
# Ensure each value is correctly computed and avoid empty lists
max_result = np.max(max_values) if max_values else float("-inf")
min_result = np.min(min_values) if min_values else float("inf")
sum_result = np.sum(sum_values) if sum_values else 0.0
return max_result, min_result, sum_result
def _binary_reduce(self, data, reduce_func):
"""Parallel binary reduction that preserves index information and handles non-power-of-two cases."""
if not data:
return None
if len(data) == 1:
return data[0][1] # Extract the value from (index, result)
while len(data) > 1:
new_data = []
with ProcessPoolExecutor(max_workers=min(len(data) // 2, self.num_chunks)) as executor:
futures = {}
for i in range(0, len(data) - 1, 2):
idx1, val1 = data[i]
idx2, val2 = data[i + 1]
future = executor.submit(reduce_func, val1, val2)
futures[future] = (idx1, idx2) # Track indices
# Collect results, keeping the index of the first element
for future in as_completed(futures):
idx1, _ = futures[future]
new_data.append((idx1, future.result()))
# If odd number of elements, carry forward the last vector properly
if len(data) % 2 == 1:
new_data.append(data[-1]) # Preserve the last (index, value)
data = new_data # Continue reduction with the new reduced set
print(data)
return data[0][1] # Final reduced value
@staticmethod
def worker_function(task):
"""Worker function that correctly accesses shared memory or NumPy slices."""
op_name, func, args = task["operation"], task["func"], task["args"]
arr_or_indices, use_shared, shared_mem_name = args # Pass shared_mem_name
if use_shared:
# Child process must reopen shared memory by name
shared_mem = shm.SharedMemory(name=shared_mem_name)
shared_array = np.ndarray(arr_or_indices.shape, dtype=np.float64, buffer=shared_mem.buf)
arr = shared_array[arr_or_indices] # Fetch only the required indices
else:
arr = arr_or_indices # Directly use NumPy slice
return task["index"], op_name, func(arr) # Return index for tracking
def run_executor(self):
"""Run computation using ProcessPoolExecutor while preserving task structure."""
tasks = self._create_tasks()
results = []
with ProcessPoolExecutor(max_workers=self.num_chunks) as executor:
future_to_task = {executor.submit(self.worker_function, task): task for task in tasks}
for future in as_completed(future_to_task):
try:
index, op_name, res = future.result()
results.append((index, op_name, res)) # Store index for tracking
except Exception as e:
print(f"Task {future_to_task[future]['index']} failed: {e}")
# Group results by operation
max_values, min_values, sum_values = [], [], []
for index, op, res in results:
if op == "max_min":
max_values.append((index, res[0]))
min_values.append((index, res[1]))
elif op == "sum":
sum_values.append((index, res))
# Perform binary reduction
max_result = self._binary_reduce(max_values, max)
min_result = self._binary_reduce(min_values, min)
sum_result = self._binary_reduce(sum_values, np.add)
return max_result, min_result, sum_result
def compute(self, backend="executor"):
"""
Compute max, min, sum using the specified backend.
:param backend: "executor" for ProcessPoolExecutor, "ray" for Ray.
:return: Final max, min, sum values.
"""
return self.run_executor()
def cleanup(self):
"""Explicitly clean up shared memory after processing."""
if self.use_shared_memory and hasattr(self, "shared_mem"):
self.shared_mem.close()
self.shared_mem.unlink() # Ensure cleanup
# Example Usage
if __name__ == "__main__":
data = np.random.rand(1_000_000) # 1 million random floats
# Run with shared memory enabled
processor = ParallelArrayProcessor(data, use_shared_memory=False)
max_val, min_val, sum_val = processor.compute(backend="executor")
print(f"ProcessPoolExecutor - Max: {max_val}, Min: {min_val}, Sum: {sum_val}")
# max_val, min_val, sum_val = processor.compute(backend="ray")
# print(f"Ray - Max: {max_val}, Min: {min_val}, Sum: {sum_val}")
processor.cleanup() # Free shared memory
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment