- Queue Types
- ProcessPoolExecutor: Map and Submit
- Matrix Operations
- MapReduce Implementation
- Roofline Model Analysis
- Function Event Tracking
- Ray for Distributed Computing
- Thread-safe by default, no explicit locks needed
- Built-in locking mechanism for threaded environments
- Example:
import queue
import threading
q = queue.Queue()
q.put(item) # Add item (blocks if full)
item = q.get() # Remove item (blocks if empty)
-
Unmanaged Queue (multiprocessing.Queue)
- Faster implementation using shared memory + OS pipes
- Not accessible across machines
- Example:
from multiprocessing import Process, Queue q = Queue(maxsize=5) # Optional size limit
-
Managed Queue (multiprocessing.Manager().Queue)
- Works across distributed systems
- Slower due to serialization overhead
- Example:
from multiprocessing import Process, Manager with Manager() as manager: q = manager.Queue()
from concurrent.futures import ProcessPoolExecutor
def process_item(x):
return x * x
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_item, range(10)))
Exactly! executor.map()
works with any iterable, and you have complete control over how you group or chunk your data before passing it to map. Here's a clear demonstration:
The key points about executor.map()
:
-
It works with any iterable input
-
You control the granularity by how you prepare your data:
- Individual elements (like with
ravel()
) - Chunks (using
array_split
) - Pairs (using
zip
) - Rows from 2D arrays
- Custom-sized chunks
- Individual elements (like with
-
The optimal grouping depends on your use case:
- Small groups for CPU-intensive operations
- Larger chunks for data-intensive operations
- Custom grouping for specific workload patterns
-
You can use any method that creates an iterable to group your data:
- NumPy functions (
array_split
,reshape
) - Python built-ins (
zip
, generators) - Custom grouping functions
- NumPy functions (
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_item, x) for x in range(10)]
results = [f.result() for f in futures]
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(process_item, n): i
for i, n in enumerate(data)}
ordered_results = []
for future in futures:
index = futures[future]
result = future.result()
ordered_results.append((index, result))
ordered_results.sort() # Sort by original index
import numpy as np
def multiply(a, b):
return a * b
A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])
# Flatten matrices
A_flat = A.ravel()
B_flat = B.ravel()
with ProcessPoolExecutor(max_workers=4) as executor:
result_flat = list(executor.map(multiply, A_flat, B_flat))
result = np.array(result_flat).reshape(A.shape)
def multiply_rows(row_a, row_b):
return row_a * row_b
A_rows = [row for row in A]
B_rows = [row for row in B]
with ProcessPoolExecutor(max_workers=4) as executor:
result_rows = list(executor.map(multiply_rows, A_rows, B_rows))
result = np.array(result_rows)
from collections import Counter
import re
def map_word_count(text_chunk):
words = re.findall(r'\w+', text_chunk.lower())
return Counter(words)
def reduce_word_counts(counts_list):
final_counts = Counter()
for count in counts_list:
final_counts.update(count)
return final_counts
def parallel_word_count(text, num_workers=4):
text_chunks = text.split('\n')
with ProcessPoolExecutor(max_workers=num_workers) as executor:
partial_counts = list(executor.map(map_word_count, text_chunks))
return reduce_word_counts(partial_counts)
def parallel_reduce(counts_list, num_workers=4):
with ProcessPoolExecutor(max_workers=num_workers) as executor:
chunk_size = len(counts_list) // num_workers
chunks = [counts_list[i:i + chunk_size]
for i in range(0, len(counts_list), chunk_size)]
partial_reductions = list(executor.map(reduce_word_counts, chunks))
return reduce_word_counts(partial_reductions)
def calculate_bounds(matrix_size, memory_bandwidth, peak_flops):
# Calculate FLOPs
n = matrix_size
total_flops = 2 * (n ** 3) # Matrix multiplication
# Calculate memory traffic
bytes_per_element = 8 # Double precision
total_bytes = 3 * (n ** 2) * bytes_per_element
# Calculate bounds
operational_intensity = total_flops / total_bytes
memory_bound = operational_intensity * memory_bandwidth
compute_bound = peak_flops
return min(memory_bound, compute_bound)
def calculate_execution_times(matrix_size, memory_bandwidth, peak_flops):
n = matrix_size
total_flops = 2 * (n ** 3)
total_bytes = 3 * (n ** 2) * 8
compute_time = total_flops / peak_flops
memory_time = total_bytes / memory_bandwidth
return compute_time, memory_time
def generate_function_events(samples):
active_functions = {}
events = []
for timestamp, functions in enumerate(samples):
current_functions = set(functions)
# New functions
for func in current_functions - active_functions.keys():
active_functions[func] = timestamp
# Ended functions
for func in list(active_functions.keys()):
if func not in current_functions:
events.append((func, active_functions[func], timestamp))
del active_functions[func]
# Handle still active functions
final_timestamp = len(samples)
for func, start_time in active_functions.items():
events.append((func, start_time, final_timestamp))
return events
def parallel_generate_function_events(samples, num_workers=4, chunk_size=100000):
num_samples = len(samples)
chunks = [samples[i:i + chunk_size]
for i in range(0, num_samples, chunk_size)]
chunk_indices = range(0, num_samples, chunk_size)
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = executor.map(process_chunk, chunks, chunk_indices)
all_events = [event for result in results for event in result]
return merge_events(all_events)
import ray
ray.init()
@ray.remote
def square(x):
return x * x
futures = [square.remote(i) for i in range(10)]
results = ray.get(futures)
import numpy as np
@ray.remote
def process_array(arr_ref):
arr = ray.get(arr_ref)
return np.sum(arr)
data = np.random.rand(1000, 1000)
data_ref = ray.put(data) # Efficient data sharing
results = ray.get([process_array.remote(data_ref) for _ in range(4)])
- Distributed computing across multiple machines
- GPU acceleration requirements
- Dynamic workload balancing
- Large-scale data processing
- Avoiding shared memory complexity
Ray provides simpler syntax and better scaling for distributed computing compared to ProcessPoolExecutor, but ProcessPoolExecutor is often sufficient for single-machine parallelism.
When working with distributed computing systems, choosing the right storage strategy for shared data is crucial for optimal performance. This guide explores how to make intelligent decisions about data placement based on hardware characteristics like cache size, memory bandwidth, and network efficiency.
Understanding the memory hierarchy is essential for optimal data placement:
- CPU Cache Levels
- L1 Cache: 32-64 KB per core (fastest access)
- L2 Cache: 512 KB-2 MB per core (very fast access)
- L3 Cache: 8-64 MB shared (fast access)
- Main Memory (RAM): High capacity but slower access
- Distributed Storage: Network-based access, highest latency
For data that fits within the CPU cache (typically < 50MB), use direct Inter-Process Communication (IPC):
@ray.remote
def compute(x, y):
return x + y # Small data passed directly via IPC
result = ray.get(compute.remote(5, 10))
Key benefits:
- Minimal latency due to cache utilization
- Avoid unnecessary serialization overhead
- Efficient for frequently accessed small objects
For data larger than L3 cache but fitting in RAM, use shared memory:
import numpy as np
import ray
ray.init()
# Store 40MB array in shared memory
data = np.random.rand(5000000)
data_ref = ray.put(data)
@ray.remote
def compute_partial_sum(data_ref, start, end):
arr = ray.get(data_ref)
return np.sum(arr[start:end])
Advantages:
- Zero-copy access across processes
- Efficient memory utilization
- Balanced performance for medium-sized datasets
For datasets exceeding single-node RAM capacity, use distributed storage:
# Store large array (800MB) in distributed object store
data = np.random.rand(10**8)
data_ref = ray.put(data)
@ray.remote
def process_chunk(start, end, data_ref):
arr = ray.get(data_ref)
return np.sum(arr[start:end])
Benefits:
- Scalable across multiple nodes
- Automatic data locality optimization
- Efficient for large-scale distributed computing
When choosing a storage strategy, consider:
- Available cache size per core
- Total RAM capacity
- Network bandwidth and latency
- Number of compute nodes
- Data access patterns
import psutil
def choose_storage_strategy(data_size):
mem = psutil.virtual_memory()
cache_size = 50 * 10**6 # Assume 50MB L3 cache
available_ram = mem.available
if data_size < cache_size:
return "IPC (direct passing)"
elif data_size < available_ram // 10:
return "Shared Memory (ray.put())"
else:
return "Distributed Object Store"
Utilize available monitoring tools:
- Ray Dashboard for cluster metrics
- Linux numactl for NUMA optimization
- Memory bandwidth monitoring tools
- GPU memory monitoring (if applicable)
Data Size | Storage Method | Access Pattern | Best For |
---|---|---|---|
< 50MB | IPC/Direct | Function arguments | High-frequency, small data |
50MB-2GB | Shared Memory | ray.put() | Medium-sized, shared data |
> 2GB | Distributed Store | Object Store | Large-scale distributed processing |
import numpy as np
from multiprocessing import shared_memory, Process
import multiprocessing as mp
def create_shared_array(shape, dtype):
"""Create a shared memory numpy array."""
nbytes = int(np.prod(shape)) * np.dtype(dtype).itemsize
shm = shared_memory.SharedMemory(create=True, size=nbytes)
array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
return array, shm
def worker_process(shape, dtype, shm_name, start, end):
"""Worker process that accesses shared memory."""
# Attach to existing shared memory
existing_shm = shared_memory.SharedMemory(name=shm_name)
# Create numpy array using shared memory buffer
array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# Process data
local_sum = np.sum(array[start:end])
existing_shm.close()
return local_sum
def parallel_sum_example():
# Create sample data
shape = (10_000_000,) # 10M elements
dtype = np.float64
# Create shared memory array
data, shm = create_shared_array(shape, dtype)
data[:] = np.random.random(shape) # Fill with random data
# Setup process pool executor
num_processes = mp.cpu_count()
chunk_size = len(data) // num_processes
# Create futures for parallel execution
with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = []
for i in range(num_processes):
start = i * chunk_size
end = start + chunk_size if i < num_processes - 1 else len(data)
future = executor.submit(
worker_process,
shape,
dtype,
shm.name,
start,
end
)
futures.append(future)
# Gather results
results = [future.result() for future in futures]
# Sum results from all processes
total_sum = sum(results)
# Cleanup
shm.close()
shm.unlink()
return total_sum
import numpy as np
from multiprocessing import shared_memory
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
class SharedArray:
"""Wrapper class for shared memory array operations."""
def __init__(self, shape, dtype=np.float64):
self.shape = shape
self.dtype = dtype
nbytes = int(np.prod(shape)) * np.dtype(dtype).itemsize
self.shm = shared_memory.SharedMemory(create=True, size=nbytes)
self.array = np.ndarray(shape, dtype=dtype, buffer=self.shm.buf)
def __del__(self):
"""Cleanup shared memory on deletion."""
self.shm.close()
self.shm.unlink()
def process_chunk(args):
"""Worker function that processes a chunk of shared array."""
start, end, shm_name, shape, dtype = args
# Attach to shared memory
shm = shared_memory.SharedMemory(name=shm_name)
# Create array view
array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
# Process chunk
result = np.sum(array[start:end])
# Cleanup
shm.close()
return result
def parallel_processing_example():
# Initialize shared array
shape = (10_000_000,)
shared_arr = SharedArray(shape)
shared_arr.array[:] = np.random.random(shape)
# Prepare chunks for parallel processing
num_processes = mp.cpu_count()
chunk_size = shape[0] // num_processes
tasks = []
for i in range(num_processes):
start = i * chunk_size
end = start + chunk_size if i < num_processes - 1 else shape[0]
tasks.append((
start,
end,
shared_arr.shm.name,
shape,
shared_arr.dtype
))
# Process in parallel using ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = [executor.submit(process_chunk, task) for task in tasks]
results = [future.result() for future in futures]
total_sum = sum(results)
return total_sum
if __name__ == '__main__':
result = parallel_processing_example()
print(f"Total sum: {result}")
import contextlib
@contextlib.contextmanager
def managed_shared_memory(shape, dtype=np.float64):
"""Context manager for safe shared memory handling."""
try:
nbytes = int(np.prod(shape)) * np.dtype(dtype).itemsize
shm = shared_memory.SharedMemory(create=True, size=nbytes)
array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
yield array, shm
finally:
shm.close()
shm.unlink()
def safe_parallel_processing():
shape = (10_000_000,)
with managed_shared_memory(shape) as (array, shm):
# Fill array with data
array[:] = np.random.random(shape)
# Process in parallel
num_processes = mp.cpu_count()
chunk_size = shape[0] // num_processes
tasks = [
(i * chunk_size,
(i + 1) * chunk_size if i < num_processes - 1 else shape[0],
shm.name, shape, np.float64)
for i in range(num_processes)
]
with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = [executor.submit(process_chunk, task) for task in tasks]
results = [future.result() for future in futures]
return sum(results)
-
Cache Awareness
- Keep frequently accessed small data in cache
- Avoid cache pollution with large sequential reads
- Use prefetching for predictable access patterns
-
Memory Management
- Monitor memory pressure across nodes
- Implement proper cleanup for shared resources
- Use memory pools for frequent allocations
-
Network Optimization
- Minimize data movement across nodes
- Batch small operations when possible
- Consider data locality in task scheduling
Choosing the right storage strategy requires balancing multiple factors including data size, access patterns, and hardware capabilities. Regular profiling and monitoring help ensure optimal performance as workloads evolve.