Skip to content

Instantly share code, notes, and snippets.

@abelardojarab
Created February 9, 2025 04:31
Show Gist options
  • Save abelardojarab/70733a1650b05bb48f2dcb86f244378a to your computer and use it in GitHub Desktop.
Save abelardojarab/70733a1650b05bb48f2dcb86f244378a to your computer and use it in GitHub Desktop.
General method to solve problems

Parallel Computation: Task Execution, ProcessPoolExecutor, and Ray for Matrix Multiplication

Introduction

Parallel execution is essential for optimizing computational workloads such as matrix multiplication, large-scale data processing, and distributed computing. In this document, we explore:

  1. Creating a Generalized Task Execution Framework
  2. Using ProcessPoolExecutor and Ray for 1D and 2D Task Execution
  3. Parallel Matrix Multiplication (Stripe-Based vs. Block-Based Approaches)

1. Generalized Task Execution Framework

1.1 Standardized Task Descriptor

For both 1D and 2D cases, we use a dictionary-based task descriptor:

tasks = [
    {"index": i, "func": some_function, "args": (constant_value, input_slice)}
    for i, input_slice in enumerate(input_slices)
]

This structure works with both ProcessPoolExecutor & Ray


2. Parallel Execution Using ProcessPoolExecutor & Ray

2.1 ProcessPoolExecutor (map vs. submit)

2.1.1 Using .map() (Preserves Order Automatically)

from concurrent.futures import ProcessPoolExecutor

def worker_function(index, func, *args):
    return index, func(*args)

with ProcessPoolExecutor() as executor:
    results = list(executor.map(
        lambda t: worker_function(t["index"], t["func"], *t["args"]), tasks
    ))

2.1.2 Using .submit() (Handles Long-Running Tasks Efficiently)

from concurrent.futures import as_completed

def execute_with_submit(tasks):
    results = {}
    with ProcessPoolExecutor() as executor:
        futures = {executor.submit(worker_function, t["index"], t["func"], *t["args"]): t["index"] for t in tasks}
        for future in as_completed(futures):
            results[futures[future]] = future.result()
    return [results[i] for i in sorted(results)]

Use .map() for simple batch processing, .submit() for complex tasks.

2.2 Parallel Execution with Ray

2.2.1 Ray Implementation

import ray
ray.init(ignore_reinit_error=True)

@ray.remote
def ray_worker(index, func, *args):
    return index, func(*args)

def execute_with_ray(tasks):
    futures = [ray_worker.remote(t["index"], t["func"], *t["args"]) for t in tasks]
    indexed_results = ray.get(futures)
    return [result for _, result in sorted(indexed_results)]

Ray enables distributed execution without manual process management.


3. Parallel Matrix Multiplication

3.1 Stripe-Based Multiplication

  • Matrix A is partitioned into row stripes of configurable height.
  • Matrix B is partitioned into column stripes of configurable width.
  • Each worker fetches one row block of A and one column block of B to compute C[i:i+block_size, j:j+block_size].

3.1.1 Storing Blocked Matrices in Ray Object Store

def partition_blocks(matrix, block_size, axis):
    """Partitions matrix into blocks of configurable size."""
    if axis == 0:  # Row blocks
        return {i: ray.put(matrix[i:i+block_size, :]) for i in range(0, matrix.shape[0], block_size)}
    elif axis == 1:  # Column blocks
        return {j: ray.put(matrix[:, j:j+block_size]) for j in range(0, matrix.shape[1], block_size)}

3.1.2 Worker Fetching & Computation

@ray.remote
def multiply_block(row_idx, col_idx, a_block_ref, b_block_ref):
    A_block = ray.get(a_block_ref)
    B_block = ray.get(b_block_ref)
    return row_idx, col_idx, A_block @ B_block

3.1.3 Assigning Tasks & Constructing C

C_tiles = {(i, j): multiply_block.remote(i, j, A_blocks[i], B_blocks[j]) 
            for i in range(0, m, block_size) 
            for j in range(0, k, block_size)}
C_final = np.zeros((m, k))

for (i, j), c_ref in C_tiles.items():
    row, col, result = ray.get(c_ref)
    C_final[row:row+block_size, col:col+block_size] = result

3.1.4 Why C_final[row:row+block_size, col:col+block_size] Works

Instead of computing single elements, each worker computes a submatrix (tile) of C. This is because:

  1. Each worker receives a block of rows from A and a block of columns from B.
  2. The resulting computation is a full matrix product, yielding a tile (submatrix) of C.
  3. We can store C[i:i+block_size, j:j+block_size] directly without additional reduction.

4. Conclusion

  • Configurable block sizes improve cache locality and memory efficiency.
  • Stripe-Based Multiplication is ideal for distributed computing.
  • Block-Based Multiplication is best for single-node GPU/CPU acceleration.
  • Hybrid approaches (Stripe + Block) provide optimal tradeoffs.

For large-scale distributed execution, Stripe-Based is better. For single-node high-performance computing, Block-Based is optimal.


Would you like a benchmark comparing different block sizes for performance optimization? 🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment