Last active
August 29, 2015 14:19
-
-
Save rohit-jamuar/b6e2f819c6985e0e480d to your computer and use it in GitHub Desktop.
MapReduce (top k words) using multiproc
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python | |
| # -*- coding: utf-8 -*- | |
| ''' | |
| Simple Distributed File Indexer v0.1 : 'MapReduce with multiproc' | |
| ''' | |
| from multiprocessing import Pool | |
| from collections import defaultdict | |
| from re import findall, compile | |
| from os.path import isfile, isdir | |
| from sys import argv | |
| from glob import glob | |
| from os import sep | |
| from string import punctuation | |
| POOL = None | |
| PATTERN = compile(r'[A-Za-z\d]+') | |
| def tokenize_text(fname): | |
| ''' | |
| Takes an absolute file-path 'fname' and tokenizes it contents on the | |
| basis of regex defined by 'PATTERN'. It computes token-count and returns | |
| {token : count} structure back to calling function. | |
| ''' | |
| result = defaultdict(int) | |
| if isfile(fname): | |
| with open(fname) as file_input: | |
| for line in file_input: | |
| for punc in punctuation: | |
| line = line.replace(punc, '') | |
| for token in findall(PATTERN, line): | |
| result[token.lower()] += 1 | |
| return result | |
| def aggregate_tokens(dir_path): | |
| ''' | |
| Receives an absolute-path to a directory consisting of text files. It then | |
| spawns multiple processes for tokenizing the contents and aggregates all | |
| the intermediate results in a dictionary ('aggregate'). | |
| ''' | |
| aggregate = defaultdict(int) | |
| if isdir(dir_path): | |
| dir_path = dir_path if not dir_path.endswith(sep) \ | |
| else dir_path[:-len(sep)] | |
| file_names = \ | |
| [file_name | |
| for file_name in glob('{}{}*.txt' | |
| .format(dir_path, sep)) | |
| ] | |
| for intermediate in \ | |
| POOL.map(tokenize_text, file_names): | |
| for key, val in intermediate.items(): | |
| aggregate[key] += val | |
| return aggregate | |
| def find_top_k(data, k): | |
| ''' | |
| Returns a list of 'k' keys with the largest values in 'data'. | |
| ''' | |
| result = [] | |
| while all([data, k > 0]): | |
| max_key = max(data, key=data.get) | |
| result.append([max_key, data[max_key]]) | |
| data.pop(max_key, None) | |
| k -= 1 | |
| return result | |
| if __name__ == '__main__': | |
| if len(argv) == 4: | |
| try: | |
| WORKER_COUNT = int(argv[1]) | |
| MAX_COUNT = int(argv[2]) | |
| BASE_DIR = argv[3] | |
| if any([WORKER_COUNT <= 0, MAX_COUNT <= 0]): | |
| raise ValueError | |
| POOL = Pool(processes=WORKER_COUNT) | |
| HEADER_PRINTED = False | |
| for word, count in \ | |
| find_top_k(aggregate_tokens(BASE_DIR), MAX_COUNT): | |
| if not HEADER_PRINTED: | |
| print '\n{:*^20} -- {:*^20}\n'.format('word', 'count') | |
| HEADER_PRINTED = True | |
| print '{:^20} -- {:^20}'.format(word, count) | |
| exit(0) | |
| except ValueError: | |
| print "Worker-count / 'k' should be a positive integer (> 0)." | |
| print "Invalid arguments : need to pass number of processes, 'k' and", | |
| print "absolute path of directory (containing all text files), in order." | |
| exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment