Created
July 17, 2012 04:44
-
-
Save irskep/3127205 to your computer and use it in GitHub Desktop.
MapReduce word frequency count without mrjob.job
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python | |
| """Counts the frequencies of words in a document, and doubles the count just | |
| for kicks. | |
| Usage: | |
| python -m mrjob.launch wfc.py -r local <input files> | |
| """ | |
| import itertools | |
| import json | |
| import optparse | |
| import re | |
| import sys | |
| WORD_RE = re.compile(r"[\w']+") | |
| # Tasks are defined as functions that take an input file object and an output | |
| # file object. In most cases, each line in the input file is of the form: | |
| # key\tvalue\n | |
| # This behavior may be different depending on things you set up in Hadoop and | |
| # mrjob, but it is the default, so don't worry about it. | |
| # **probable exception: first mapper gets raw lines from input file. I could | |
| # be wrong about this though, haven't tested with actual Hadoop yet.** | |
| # Output lines should be written in the same way. | |
| def _write(stdout, key, value): | |
| stdout.write('%s\t%s\n' % (key, value)) | |
| def _group_by_key(in_file, sep='\t'): | |
| """Turn this: | |
| ['x\ta', 'x\tb', 'y\tc'] | |
| into this: | |
| [('x', ['a', 'b']), ('y', ['c'])] | |
| """ | |
| group_key = lambda line: line.split(sep, 1)[0] | |
| return itertools.groupby(in_file, key=group_key) | |
| def lines_to_word_occurrences(in_file, stdout): | |
| """For each line of input, output (word, 1) for each word in the line""" | |
| for line in in_file: | |
| for word in WORD_RE.findall(line): | |
| _write(stdout, word, 1) | |
| def sum_word_occurrences(in_file, stdout): | |
| """Group input lines by key and output (key, sum(values))""" | |
| for word, lines in _group_by_key(in_file): | |
| value = sum(int(line.split('\t', 1)[1]) for line in lines) | |
| _write(stdout, word, value) | |
| def multiply_value_by_2(in_file, stdout): | |
| """Emit (key, 2*value) for each line in in_file""" | |
| for line in in_file: | |
| key, value = line.split('\t', 1) | |
| _write(stdout, key, 2 * int(value)) | |
| def _run_task(task, paths, stdin, stdout): | |
| """Run *task* for each file in *paths*. Use stdin if '-' is an arg or there | |
| are no args. | |
| """ | |
| for path in (paths or ['-']): | |
| if path == '-': | |
| task(stdin, stdout) | |
| else: | |
| with open(path, 'r') as f: | |
| task(f, stdout) | |
| def main(argv, stdin, stdout, stderr): | |
| p = optparse.OptionParser() | |
| p.add_option('--steps', default=False, action='store_true') | |
| p.add_option('--mapper', default=False, action='store_true') | |
| p.add_option('--reducer', default=False, action='store_true') | |
| p.add_option('--step-num', default=None, type='int') | |
| opts, args = p.parse_args(argv) | |
| # --steps behavior. This job has 2 steps, the first with a mapper and | |
| # reducer and the second with only a mapper. They are all 'script' steps, | |
| # meaning that they are run by invoking this file with --step-num=X and | |
| # [--mapper|--reducer]. | |
| # The output of --steps tells mrjob what steps the job has. | |
| if opts.steps: | |
| if any((opts.mapper, opts.reducer, opts.step_num)): | |
| print >> stderr, ( | |
| '--steps is mutually exclusive with all other options.') | |
| print json.dumps([ | |
| {'type': 'streaming', | |
| 'mapper': {'type': 'script'}, | |
| 'reducer': {'type': 'script'}}, | |
| {'type': 'streaming', | |
| 'mapper': {'type': 'script'}}]) | |
| return 0 | |
| # --step-num is required if --steps not present | |
| if opts.step_num is None: | |
| print >> stderr, ( | |
| 'You must specify --step-num if not using --steps.') | |
| return 1 | |
| # likewise for [--mapper|--reducer] | |
| if ((opts.mapper and opts.reducer) or | |
| (not opts.mapper and not opts.reducer)): | |
| print >> stderr, { | |
| 'You must specify exactly one of either --mapper or --reducer' | |
| ' if not using --steps.'} | |
| return 1 | |
| # decide which mapper to run based on --step-num | |
| if opts.mapper: | |
| if opts.step_num == 0: | |
| _run_task(lines_to_word_occurrences, args, stdin, stdout) | |
| return 0 | |
| elif opts.step_num == 1: | |
| _run_task(multiply_value_by_2, args, stdin, stdout) | |
| return 0 | |
| else: | |
| print >> stderr, 'There is no step %d mapper!' % opts.step_num | |
| return 1 | |
| # run reducer if --step-num is correct | |
| if opts.reducer: | |
| if opts.step_num == 0: | |
| _run_task(sum_word_occurrences, args, stdin, stdout) | |
| return 0 | |
| else: | |
| print >> stderr, 'There is no step %d reducer!' % opts.step_num | |
| return 1 | |
| raise Exception("How did we get here???") | |
| if __name__ == '__main__': | |
| # invoke with sys.argv, etc. Test cases might use different values. | |
| sys.exit(main(None, sys.stdin, sys.stdout, sys.stderr)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment