Parallel Computation: Task Execution, ProcessPoolExecutor, and Ray for Matrix Multiplication
Parallel execution is essential for optimizing computational workloads such as matrix multiplication, large-scale data processing, and distributed computing. In this document, we explore:
- Creating a Generalized Task Execution Framework
- Using ProcessPoolExecutor and Ray for 1D and 2D Task Execution
- Parallel Matrix Multiplication (Stripe-Based vs. Block-Based Approaches)
For both 1D and 2D cases, we use a dictionary-based task descriptor:
tasks = [
{"index": i, "func": some_function, "args": (constant_value, input_slice)}
for i, input_slice in enumerate(input_slices)
]
✅ This structure works with both ProcessPoolExecutor & Ray
from concurrent.futures import ProcessPoolExecutor
def worker_function(index, func, *args):
return index, func(*args)
with ProcessPoolExecutor() as executor:
results = list(executor.map(
lambda t: worker_function(t["index"], t["func"], *t["args"]), tasks
))
from concurrent.futures import as_completed
def execute_with_submit(tasks):
results = {}
with ProcessPoolExecutor() as executor:
futures = {executor.submit(worker_function, t["index"], t["func"], *t["args"]): t["index"] for t in tasks}
for future in as_completed(futures):
results[futures[future]] = future.result()
return [results[i] for i in sorted(results)]
✅ Use .map()
for simple batch processing, .submit()
for complex tasks.
import ray
ray.init(ignore_reinit_error=True)
@ray.remote
def ray_worker(index, func, *args):
return index, func(*args)
def execute_with_ray(tasks):
futures = [ray_worker.remote(t["index"], t["func"], *t["args"]) for t in tasks]
indexed_results = ray.get(futures)
return [result for _, result in sorted(indexed_results)]
✅ Ray enables distributed execution without manual process management.
- Matrix A is partitioned into row stripes of configurable height.
- Matrix B is partitioned into column stripes of configurable width.
- Each worker fetches one row block of A and one column block of B to compute C[i:i+block_size, j:j+block_size].
def partition_blocks(matrix, block_size, axis):
"""Partitions matrix into blocks of configurable size."""
if axis == 0: # Row blocks
return {i: ray.put(matrix[i:i+block_size, :]) for i in range(0, matrix.shape[0], block_size)}
elif axis == 1: # Column blocks
return {j: ray.put(matrix[:, j:j+block_size]) for j in range(0, matrix.shape[1], block_size)}
@ray.remote
def multiply_block(row_idx, col_idx, a_block_ref, b_block_ref):
A_block = ray.get(a_block_ref)
B_block = ray.get(b_block_ref)
return row_idx, col_idx, A_block @ B_block
C_tiles = {(i, j): multiply_block.remote(i, j, A_blocks[i], B_blocks[j])
for i in range(0, m, block_size)
for j in range(0, k, block_size)}
C_final = np.zeros((m, k))
for (i, j), c_ref in C_tiles.items():
row, col, result = ray.get(c_ref)
C_final[row:row+block_size, col:col+block_size] = result
Instead of computing single elements, each worker computes a submatrix (tile) of C
. This is because:
- Each worker receives a block of rows from A and a block of columns from B.
- The resulting computation is a full matrix product, yielding a tile (submatrix) of
C
. - We can store
C[i:i+block_size, j:j+block_size]
directly without additional reduction.
- Configurable block sizes improve cache locality and memory efficiency.
- Stripe-Based Multiplication is ideal for distributed computing.
- Block-Based Multiplication is best for single-node GPU/CPU acceleration.
- Hybrid approaches (Stripe + Block) provide optimal tradeoffs.
For large-scale distributed execution, Stripe-Based is better. For single-node high-performance computing, Block-Based is optimal.
Would you like a benchmark comparing different block sizes for performance optimization? 🚀