Skip to content

Instantly share code, notes, and snippets.

@rohit-jamuar
Last active August 29, 2015 14:19
Show Gist options
  • Select an option

  • Save rohit-jamuar/b6e2f819c6985e0e480d to your computer and use it in GitHub Desktop.

Select an option

Save rohit-jamuar/b6e2f819c6985e0e480d to your computer and use it in GitHub Desktop.
MapReduce (top k words) using multiproc
#!/usr/bin/python
# -*- coding: utf-8 -*-
'''
Simple Distributed File Indexer v0.1 : 'MapReduce with multiproc'
'''
from multiprocessing import Pool
from collections import defaultdict
from re import findall, compile
from os.path import isfile, isdir
from sys import argv
from glob import glob
from os import sep
from string import punctuation
POOL = None
PATTERN = compile(r'[A-Za-z\d]+')
def tokenize_text(fname):
'''
Takes an absolute file-path 'fname' and tokenizes it contents on the
basis of regex defined by 'PATTERN'. It computes token-count and returns
{token : count} structure back to calling function.
'''
result = defaultdict(int)
if isfile(fname):
with open(fname) as file_input:
for line in file_input:
for punc in punctuation:
line = line.replace(punc, '')
for token in findall(PATTERN, line):
result[token.lower()] += 1
return result
def aggregate_tokens(dir_path):
'''
Receives an absolute-path to a directory consisting of text files. It then
spawns multiple processes for tokenizing the contents and aggregates all
the intermediate results in a dictionary ('aggregate').
'''
aggregate = defaultdict(int)
if isdir(dir_path):
dir_path = dir_path if not dir_path.endswith(sep) \
else dir_path[:-len(sep)]
file_names = \
[file_name
for file_name in glob('{}{}*.txt'
.format(dir_path, sep))
]
for intermediate in \
POOL.map(tokenize_text, file_names):
for key, val in intermediate.items():
aggregate[key] += val
return aggregate
def find_top_k(data, k):
'''
Returns a list of 'k' keys with the largest values in 'data'.
'''
result = []
while all([data, k > 0]):
max_key = max(data, key=data.get)
result.append([max_key, data[max_key]])
data.pop(max_key, None)
k -= 1
return result
if __name__ == '__main__':
if len(argv) == 4:
try:
WORKER_COUNT = int(argv[1])
MAX_COUNT = int(argv[2])
BASE_DIR = argv[3]
if any([WORKER_COUNT <= 0, MAX_COUNT <= 0]):
raise ValueError
POOL = Pool(processes=WORKER_COUNT)
HEADER_PRINTED = False
for word, count in \
find_top_k(aggregate_tokens(BASE_DIR), MAX_COUNT):
if not HEADER_PRINTED:
print '\n{:*^20} -- {:*^20}\n'.format('word', 'count')
HEADER_PRINTED = True
print '{:^20} -- {:^20}'.format(word, count)
exit(0)
except ValueError:
print "Worker-count / 'k' should be a positive integer (> 0)."
print "Invalid arguments : need to pass number of processes, 'k' and",
print "absolute path of directory (containing all text files), in order."
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment