Created
February 9, 2025 16:33
-
-
Save abelardojarab/947dc4922c481590dcf2d5f5a60ce242 to your computer and use it in GitHub Desktop.
Count words in Python in parallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import ProcessPoolExecutor | |
from collections import Counter | |
import os | |
#import psutil | |
import re | |
def map_word_count(text_chunk): | |
"""Map function: Counts words in a chunk of text.""" | |
words = re.findall(r'\w+', text_chunk.lower()) # Tokenize words | |
return Counter(words) # Return word frequencies | |
def reduce_word_counts(counts_list): | |
"""Reduce function: Merges multiple word frequency dictionaries.""" | |
final_counts = Counter() | |
for count in counts_list: | |
final_counts.update(count) | |
return final_counts | |
def parallel_reduce(counts_list, num_workers=4): | |
"""Parallel reduction of word count dictionaries.""" | |
with ProcessPoolExecutor(max_workers=num_workers) as executor: | |
chunk_size = len(counts_list) // num_workers | |
if chunk_size == 0: | |
chunk_size = 2 | |
chunks = [counts_list[i:i + chunk_size] for i in range(0, len(counts_list), chunk_size)] | |
partial_reductions = list(executor.map(reduce_word_counts, chunks)) | |
return reduce_word_counts(partial_reductions) | |
def get_cache_friendly_chunk_size(): | |
"""Estimate a cache-friendly chunk size for MapReduce.""" | |
cache_size = 8 * 1024 * 1024 | |
# cache_size = os.sysconf('SC_LEVEL3_CACHE_SIZE') or (8 * 1024 * 1024) # Assume 8MB if unknown | |
return cache_size // os.cpu_count() | |
#def set_process_affinity(): | |
# """Set process affinity to bind to local NUMA node.""" | |
# p = psutil.Process(os.getpid()) | |
# numa_cores = list(range(os.cpu_count() // 2)) # Assume 2 NUMA nodes | |
# p.cpu_affinity(numa_cores) | |
def chunk_text(text, chunk_size): | |
"""Splits text into cache-optimized chunks.""" | |
lines = text.split("\n") | |
return ["\n".join(lines[i:i+chunk_size]) for i in range(0, len(lines), chunk_size)] | |
def parallel_word_count(text, num_workers=4): | |
"""Optimized MapReduce with cache-friendly chunking & parallel Reduce.""" | |
#set_process_affinity() | |
chunk_size = get_cache_friendly_chunk_size() # Optimize chunk size | |
text_chunks = chunk_text(text, chunk_size // 100) # Convert bytes to lines approx. | |
with ProcessPoolExecutor(max_workers=num_workers) as executor: | |
partial_counts = list(executor.map(map_word_count, text_chunks)) | |
final_word_counts = parallel_reduce(partial_counts, num_workers) | |
return final_word_counts | |
# Sample large text | |
text = "\n".join(["Parallel computing is powerful." for _ in range(10000)]) # Large dataset | |
# Run Optimized MapReduce | |
word_frequencies = parallel_word_count(text) | |
# Print top words | |
print(word_frequencies.most_common(5)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment