Skip to content

Instantly share code, notes, and snippets.

@abelardojarab
Created February 9, 2025 16:33
Show Gist options
  • Save abelardojarab/947dc4922c481590dcf2d5f5a60ce242 to your computer and use it in GitHub Desktop.
Save abelardojarab/947dc4922c481590dcf2d5f5a60ce242 to your computer and use it in GitHub Desktop.
Count words in Python in parallel
from concurrent.futures import ProcessPoolExecutor
from collections import Counter
import os
#import psutil
import re
def map_word_count(text_chunk):
"""Map function: Counts words in a chunk of text."""
words = re.findall(r'\w+', text_chunk.lower()) # Tokenize words
return Counter(words) # Return word frequencies
def reduce_word_counts(counts_list):
"""Reduce function: Merges multiple word frequency dictionaries."""
final_counts = Counter()
for count in counts_list:
final_counts.update(count)
return final_counts
def parallel_reduce(counts_list, num_workers=4):
"""Parallel reduction of word count dictionaries."""
with ProcessPoolExecutor(max_workers=num_workers) as executor:
chunk_size = len(counts_list) // num_workers
if chunk_size == 0:
chunk_size = 2
chunks = [counts_list[i:i + chunk_size] for i in range(0, len(counts_list), chunk_size)]
partial_reductions = list(executor.map(reduce_word_counts, chunks))
return reduce_word_counts(partial_reductions)
def get_cache_friendly_chunk_size():
"""Estimate a cache-friendly chunk size for MapReduce."""
cache_size = 8 * 1024 * 1024
# cache_size = os.sysconf('SC_LEVEL3_CACHE_SIZE') or (8 * 1024 * 1024) # Assume 8MB if unknown
return cache_size // os.cpu_count()
#def set_process_affinity():
# """Set process affinity to bind to local NUMA node."""
# p = psutil.Process(os.getpid())
# numa_cores = list(range(os.cpu_count() // 2)) # Assume 2 NUMA nodes
# p.cpu_affinity(numa_cores)
def chunk_text(text, chunk_size):
"""Splits text into cache-optimized chunks."""
lines = text.split("\n")
return ["\n".join(lines[i:i+chunk_size]) for i in range(0, len(lines), chunk_size)]
def parallel_word_count(text, num_workers=4):
"""Optimized MapReduce with cache-friendly chunking & parallel Reduce."""
#set_process_affinity()
chunk_size = get_cache_friendly_chunk_size() # Optimize chunk size
text_chunks = chunk_text(text, chunk_size // 100) # Convert bytes to lines approx.
with ProcessPoolExecutor(max_workers=num_workers) as executor:
partial_counts = list(executor.map(map_word_count, text_chunks))
final_word_counts = parallel_reduce(partial_counts, num_workers)
return final_word_counts
# Sample large text
text = "\n".join(["Parallel computing is powerful." for _ in range(10000)]) # Large dataset
# Run Optimized MapReduce
word_frequencies = parallel_word_count(text)
# Print top words
print(word_frequencies.most_common(5))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment