Skip to content

Instantly share code, notes, and snippets.

@abelardojarab
Last active February 8, 2025 23:13
Show Gist options
  • Save abelardojarab/ebd1e1323062679294974381727824b8 to your computer and use it in GitHub Desktop.
Save abelardojarab/ebd1e1323062679294974381727824b8 to your computer and use it in GitHub Desktop.
Parallel programming in Python

Complete Guide to Parallel Processing in Python

Table of Contents

  1. Queue Types
  2. ProcessPoolExecutor: Map and Submit
  3. Matrix Operations
  4. MapReduce Implementation
  5. Roofline Model Analysis
  6. Function Event Tracking
  7. Ray for Distributed Computing

Queue Types {#queue-types}

Standard Queue (queue.Queue)

  • Thread-safe by default, no explicit locks needed
  • Built-in locking mechanism for threaded environments
  • Example:
import queue
import threading

q = queue.Queue()
q.put(item)    # Add item (blocks if full)
item = q.get() # Remove item (blocks if empty)

Multiprocessing Queues

  1. Unmanaged Queue (multiprocessing.Queue)

    • Faster implementation using shared memory + OS pipes
    • Not accessible across machines
    • Example:
    from multiprocessing import Process, Queue
    q = Queue(maxsize=5)  # Optional size limit
  2. Managed Queue (multiprocessing.Manager().Queue)

    • Works across distributed systems
    • Slower due to serialization overhead
    • Example:
    from multiprocessing import Process, Manager
    with Manager() as manager:
        q = manager.Queue()

ProcessPoolExecutor: Map and Submit {#processpool-executor}

Map Operation (Preserves Order)

from concurrent.futures import ProcessPoolExecutor

def process_item(x):
    return x * x

with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_item, range(10)))

Exactly! executor.map() works with any iterable, and you have complete control over how you group or chunk your data before passing it to map. Here's a clear demonstration:

The key points about executor.map():

  1. It works with any iterable input

  2. You control the granularity by how you prepare your data:

    • Individual elements (like with ravel())
    • Chunks (using array_split)
    • Pairs (using zip)
    • Rows from 2D arrays
    • Custom-sized chunks
  3. The optimal grouping depends on your use case:

    • Small groups for CPU-intensive operations
    • Larger chunks for data-intensive operations
    • Custom grouping for specific workload patterns
  4. You can use any method that creates an iterable to group your data:

    • NumPy functions (array_split, reshape)
    • Python built-ins (zip, generators)
    • Custom grouping functions

Submit Operation (Async, No Order)

with ProcessPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(process_item, x) for x in range(10)]
    results = [f.result() for f in futures]

Maintaining Order with Submit

with ProcessPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(process_item, n): i 
              for i, n in enumerate(data)}
    
    ordered_results = []
    for future in futures:
        index = futures[future]
        result = future.result()
        ordered_results.append((index, result))
    
    ordered_results.sort()  # Sort by original index

Matrix Operations {#matrix-operations}

Element-wise Operations

import numpy as np

def multiply(a, b):
    return a * b

A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])

# Flatten matrices
A_flat = A.ravel()
B_flat = B.ravel()

with ProcessPoolExecutor(max_workers=4) as executor:
    result_flat = list(executor.map(multiply, A_flat, B_flat))
    result = np.array(result_flat).reshape(A.shape)

Row-wise Operations

def multiply_rows(row_a, row_b):
    return row_a * row_b

A_rows = [row for row in A]
B_rows = [row for row in B]

with ProcessPoolExecutor(max_workers=4) as executor:
    result_rows = list(executor.map(multiply_rows, A_rows, B_rows))
    result = np.array(result_rows)

MapReduce Implementation {#mapreduce}

Basic Implementation

from collections import Counter
import re

def map_word_count(text_chunk):
    words = re.findall(r'\w+', text_chunk.lower())
    return Counter(words)

def reduce_word_counts(counts_list):
    final_counts = Counter()
    for count in counts_list:
        final_counts.update(count)
    return final_counts

def parallel_word_count(text, num_workers=4):
    text_chunks = text.split('\n')
    
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        partial_counts = list(executor.map(map_word_count, text_chunks))
    
    return reduce_word_counts(partial_counts)

Optimized Implementation

def parallel_reduce(counts_list, num_workers=4):
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        chunk_size = len(counts_list) // num_workers
        chunks = [counts_list[i:i + chunk_size] 
                 for i in range(0, len(counts_list), chunk_size)]
        
        partial_reductions = list(executor.map(reduce_word_counts, chunks))
    
    return reduce_word_counts(partial_reductions)

Roofline Model Analysis {#roofline}

Performance Bounds Calculation

def calculate_bounds(matrix_size, memory_bandwidth, peak_flops):
    # Calculate FLOPs
    n = matrix_size
    total_flops = 2 * (n ** 3)  # Matrix multiplication
    
    # Calculate memory traffic
    bytes_per_element = 8  # Double precision
    total_bytes = 3 * (n ** 2) * bytes_per_element
    
    # Calculate bounds
    operational_intensity = total_flops / total_bytes
    memory_bound = operational_intensity * memory_bandwidth
    compute_bound = peak_flops
    
    return min(memory_bound, compute_bound)

Execute Time Calculation

def calculate_execution_times(matrix_size, memory_bandwidth, peak_flops):
    n = matrix_size
    total_flops = 2 * (n ** 3)
    total_bytes = 3 * (n ** 2) * 8
    
    compute_time = total_flops / peak_flops
    memory_time = total_bytes / memory_bandwidth
    
    return compute_time, memory_time

Function Event Tracking {#event-tracking}

Basic Implementation

def generate_function_events(samples):
    active_functions = {}
    events = []
    
    for timestamp, functions in enumerate(samples):
        current_functions = set(functions)
        
        # New functions
        for func in current_functions - active_functions.keys():
            active_functions[func] = timestamp
        
        # Ended functions
        for func in list(active_functions.keys()):
            if func not in current_functions:
                events.append((func, active_functions[func], timestamp))
                del active_functions[func]
    
    # Handle still active functions
    final_timestamp = len(samples)
    for func, start_time in active_functions.items():
        events.append((func, start_time, final_timestamp))
    
    return events

Parallel Implementation

def parallel_generate_function_events(samples, num_workers=4, chunk_size=100000):
    num_samples = len(samples)
    chunks = [samples[i:i + chunk_size] 
             for i in range(0, num_samples, chunk_size)]
    chunk_indices = range(0, num_samples, chunk_size)
    
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        results = executor.map(process_chunk, chunks, chunk_indices)
    
    all_events = [event for result in results for event in result]
    return merge_events(all_events)

Ray for Distributed Computing {#ray}

Basic Ray Example

import ray

ray.init()

@ray.remote
def square(x):
    return x * x

futures = [square.remote(i) for i in range(10)]
results = ray.get(futures)

Ray with NumPy Arrays

import numpy as np

@ray.remote
def process_array(arr_ref):
    arr = ray.get(arr_ref)
    return np.sum(arr)

data = np.random.rand(1000, 1000)
data_ref = ray.put(data)  # Efficient data sharing
results = ray.get([process_array.remote(data_ref) for _ in range(4)])

When to Use Ray

  • Distributed computing across multiple machines
  • GPU acceleration requirements
  • Dynamic workload balancing
  • Large-scale data processing
  • Avoiding shared memory complexity

Ray provides simpler syntax and better scaling for distributed computing compared to ProcessPoolExecutor, but ProcessPoolExecutor is often sufficient for single-machine parallelism.

Optimizing Data Storage Strategies in Distributed Computing

When working with distributed computing systems, choosing the right storage strategy for shared data is crucial for optimal performance. This guide explores how to make intelligent decisions about data placement based on hardware characteristics like cache size, memory bandwidth, and network efficiency.

Memory Hierarchy Overview

Understanding the memory hierarchy is essential for optimal data placement:

  1. CPU Cache Levels
    • L1 Cache: 32-64 KB per core (fastest access)
    • L2 Cache: 512 KB-2 MB per core (very fast access)
    • L3 Cache: 8-64 MB shared (fast access)
  2. Main Memory (RAM): High capacity but slower access
  3. Distributed Storage: Network-based access, highest latency

Data Placement Strategies

Small Data (Fits in Cache)

For data that fits within the CPU cache (typically < 50MB), use direct Inter-Process Communication (IPC):

@ray.remote
def compute(x, y):
    return x + y  # Small data passed directly via IPC

result = ray.get(compute.remote(5, 10))

Key benefits:

  • Minimal latency due to cache utilization
  • Avoid unnecessary serialization overhead
  • Efficient for frequently accessed small objects

Medium Data (100KB - 100MB)

For data larger than L3 cache but fitting in RAM, use shared memory:

import numpy as np
import ray

ray.init()

# Store 40MB array in shared memory
data = np.random.rand(5000000)  
data_ref = ray.put(data)

@ray.remote
def compute_partial_sum(data_ref, start, end):
    arr = ray.get(data_ref)
    return np.sum(arr[start:end])

Advantages:

  • Zero-copy access across processes
  • Efficient memory utilization
  • Balanced performance for medium-sized datasets

Large Data (100MB - Multiple GB)

For datasets exceeding single-node RAM capacity, use distributed storage:

# Store large array (800MB) in distributed object store
data = np.random.rand(10**8)
data_ref = ray.put(data)

@ray.remote
def process_chunk(start, end, data_ref):
    arr = ray.get(data_ref)
    return np.sum(arr[start:end])

Benefits:

  • Scalable across multiple nodes
  • Automatic data locality optimization
  • Efficient for large-scale distributed computing

Decision Framework

Hardware Considerations

When choosing a storage strategy, consider:

  • Available cache size per core
  • Total RAM capacity
  • Network bandwidth and latency
  • Number of compute nodes
  • Data access patterns

Implementation Guide

1. Profile Hardware Resources

import psutil

def choose_storage_strategy(data_size):
    mem = psutil.virtual_memory()
    cache_size = 50 * 10**6  # Assume 50MB L3 cache
    available_ram = mem.available

    if data_size < cache_size:
        return "IPC (direct passing)"
    elif data_size < available_ram // 10:
        return "Shared Memory (ray.put())"
    else:
        return "Distributed Object Store"

2. Monitor Performance

Utilize available monitoring tools:

  • Ray Dashboard for cluster metrics
  • Linux numactl for NUMA optimization
  • Memory bandwidth monitoring tools
  • GPU memory monitoring (if applicable)

Quick Reference Guide

Data Size Storage Method Access Pattern Best For
< 50MB IPC/Direct Function arguments High-frequency, small data
50MB-2GB Shared Memory ray.put() Medium-sized, shared data
> 2GB Distributed Store Object Store Large-scale distributed processing

Multiprocessing Shared Memory Implementation

Basic Shared Memory Example

import numpy as np
from multiprocessing import shared_memory, Process
import multiprocessing as mp

def create_shared_array(shape, dtype):
    """Create a shared memory numpy array."""
    nbytes = int(np.prod(shape)) * np.dtype(dtype).itemsize
    shm = shared_memory.SharedMemory(create=True, size=nbytes)
    array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
    return array, shm

def worker_process(shape, dtype, shm_name, start, end):
    """Worker process that accesses shared memory."""
    # Attach to existing shared memory
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    # Create numpy array using shared memory buffer
    array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
    # Process data
    local_sum = np.sum(array[start:end])
    existing_shm.close()
    return local_sum

def parallel_sum_example():
    # Create sample data
    shape = (10_000_000,)  # 10M elements
    dtype = np.float64
    
    # Create shared memory array
    data, shm = create_shared_array(shape, dtype)
    data[:] = np.random.random(shape)  # Fill with random data
    
    # Setup process pool executor
    num_processes = mp.cpu_count()
    chunk_size = len(data) // num_processes
    
    # Create futures for parallel execution
    with ProcessPoolExecutor(max_workers=num_processes) as executor:
        futures = []
        for i in range(num_processes):
            start = i * chunk_size
            end = start + chunk_size if i < num_processes - 1 else len(data)
            future = executor.submit(
                worker_process, 
                shape, 
                dtype, 
                shm.name, 
                start, 
                end
            )
            futures.append(future)
        
        # Gather results
        results = [future.result() for future in futures]
    
    # Sum results from all processes
    total_sum = sum(results)
    
    # Cleanup
    shm.close()
    shm.unlink()
    
    return total_sum

Advanced Implementation with Process Pool Manager

import numpy as np
from multiprocessing import shared_memory
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp

class SharedArray:
    """Wrapper class for shared memory array operations."""
    def __init__(self, shape, dtype=np.float64):
        self.shape = shape
        self.dtype = dtype
        nbytes = int(np.prod(shape)) * np.dtype(dtype).itemsize
        self.shm = shared_memory.SharedMemory(create=True, size=nbytes)
        self.array = np.ndarray(shape, dtype=dtype, buffer=self.shm.buf)
    
    def __del__(self):
        """Cleanup shared memory on deletion."""
        self.shm.close()
        self.shm.unlink()

def process_chunk(args):
    """Worker function that processes a chunk of shared array."""
    start, end, shm_name, shape, dtype = args
    # Attach to shared memory
    shm = shared_memory.SharedMemory(name=shm_name)
    # Create array view
    array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
    # Process chunk
    result = np.sum(array[start:end])
    # Cleanup
    shm.close()
    return result

def parallel_processing_example():
    # Initialize shared array
    shape = (10_000_000,)
    shared_arr = SharedArray(shape)
    shared_arr.array[:] = np.random.random(shape)
    
    # Prepare chunks for parallel processing
    num_processes = mp.cpu_count()
    chunk_size = shape[0] // num_processes
    tasks = []
    
    for i in range(num_processes):
        start = i * chunk_size
        end = start + chunk_size if i < num_processes - 1 else shape[0]
        tasks.append((
            start, 
            end, 
            shared_arr.shm.name, 
            shape, 
            shared_arr.dtype
        ))
    
    # Process in parallel using ProcessPoolExecutor
    with ProcessPoolExecutor(max_workers=num_processes) as executor:
        futures = [executor.submit(process_chunk, task) for task in tasks]
        results = [future.result() for future in futures]
    
    total_sum = sum(results)
    return total_sum

if __name__ == '__main__':
    result = parallel_processing_example()
    print(f"Total sum: {result}")

Error Handling and Best Practices

import contextlib

@contextlib.contextmanager
def managed_shared_memory(shape, dtype=np.float64):
    """Context manager for safe shared memory handling."""
    try:
        nbytes = int(np.prod(shape)) * np.dtype(dtype).itemsize
        shm = shared_memory.SharedMemory(create=True, size=nbytes)
        array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
        yield array, shm
    finally:
        shm.close()
        shm.unlink()

def safe_parallel_processing():
    shape = (10_000_000,)
    
    with managed_shared_memory(shape) as (array, shm):
        # Fill array with data
        array[:] = np.random.random(shape)
        
        # Process in parallel
        num_processes = mp.cpu_count()
        chunk_size = shape[0] // num_processes
        tasks = [
            (i * chunk_size, 
             (i + 1) * chunk_size if i < num_processes - 1 else shape[0],
             shm.name, shape, np.float64)
            for i in range(num_processes)
        ]
        
        with ProcessPoolExecutor(max_workers=num_processes) as executor:
            futures = [executor.submit(process_chunk, task) for task in tasks]
            results = [future.result() for future in futures]
        
        return sum(results)

Best Practices

  1. Cache Awareness

    • Keep frequently accessed small data in cache
    • Avoid cache pollution with large sequential reads
    • Use prefetching for predictable access patterns
  2. Memory Management

    • Monitor memory pressure across nodes
    • Implement proper cleanup for shared resources
    • Use memory pools for frequent allocations
  3. Network Optimization

    • Minimize data movement across nodes
    • Batch small operations when possible
    • Consider data locality in task scheduling

Conclusion

Choosing the right storage strategy requires balancing multiple factors including data size, access patterns, and hardware capabilities. Regular profiling and monitoring help ensure optimal performance as workloads evolve.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment