Skip to content

Instantly share code, notes, and snippets.

@irskep
Created July 17, 2012 04:44
Show Gist options
  • Save irskep/3127205 to your computer and use it in GitHub Desktop.
Save irskep/3127205 to your computer and use it in GitHub Desktop.
MapReduce word frequency count without mrjob.job
#!/usr/bin/python
"""Counts the frequencies of words in a document, and doubles the count just
for kicks.
Usage:
python -m mrjob.launch wfc.py -r local <input files>
"""
import itertools
import json
import optparse
import re
import sys
WORD_RE = re.compile(r"[\w']+")
# Tasks are defined as functions that take an input file object and an output
# file object. In most cases, each line in the input file is of the form:
# key\tvalue\n
# This behavior may be different depending on things you set up in Hadoop and
# mrjob, but it is the default, so don't worry about it.
# **probable exception: first mapper gets raw lines from input file. I could
# be wrong about this though, haven't tested with actual Hadoop yet.**
# Output lines should be written in the same way.
def _write(stdout, key, value):
stdout.write('%s\t%s\n' % (key, value))
def _group_by_key(in_file, sep='\t'):
"""Turn this:
['x\ta', 'x\tb', 'y\tc']
into this:
[('x', ['a', 'b']), ('y', ['c'])]
"""
group_key = lambda line: line.split(sep, 1)[0]
return itertools.groupby(in_file, key=group_key)
def lines_to_word_occurrences(in_file, stdout):
"""For each line of input, output (word, 1) for each word in the line"""
for line in in_file:
for word in WORD_RE.findall(line):
_write(stdout, word, 1)
def sum_word_occurrences(in_file, stdout):
"""Group input lines by key and output (key, sum(values))"""
for word, lines in _group_by_key(in_file):
value = sum(int(line.split('\t', 1)[1]) for line in lines)
_write(stdout, word, value)
def multiply_value_by_2(in_file, stdout):
"""Emit (key, 2*value) for each line in in_file"""
for line in in_file:
key, value = line.split('\t', 1)
_write(stdout, key, 2 * int(value))
def _run_task(task, paths, stdin, stdout):
"""Run *task* for each file in *paths*. Use stdin if '-' is an arg or there
are no args.
"""
for path in (paths or ['-']):
if path == '-':
task(stdin, stdout)
else:
with open(path, 'r') as f:
task(f, stdout)
def main(argv, stdin, stdout, stderr):
p = optparse.OptionParser()
p.add_option('--steps', default=False, action='store_true')
p.add_option('--mapper', default=False, action='store_true')
p.add_option('--reducer', default=False, action='store_true')
p.add_option('--step-num', default=None, type='int')
opts, args = p.parse_args(argv)
# --steps behavior. This job has 2 steps, the first with a mapper and
# reducer and the second with only a mapper. They are all 'script' steps,
# meaning that they are run by invoking this file with --step-num=X and
# [--mapper|--reducer].
# The output of --steps tells mrjob what steps the job has.
if opts.steps:
if any((opts.mapper, opts.reducer, opts.step_num)):
print >> stderr, (
'--steps is mutually exclusive with all other options.')
print json.dumps([
{'type': 'streaming',
'mapper': {'type': 'script'},
'reducer': {'type': 'script'}},
{'type': 'streaming',
'mapper': {'type': 'script'}}])
return 0
# --step-num is required if --steps not present
if opts.step_num is None:
print >> stderr, (
'You must specify --step-num if not using --steps.')
return 1
# likewise for [--mapper|--reducer]
if ((opts.mapper and opts.reducer) or
(not opts.mapper and not opts.reducer)):
print >> stderr, {
'You must specify exactly one of either --mapper or --reducer'
' if not using --steps.'}
return 1
# decide which mapper to run based on --step-num
if opts.mapper:
if opts.step_num == 0:
_run_task(lines_to_word_occurrences, args, stdin, stdout)
return 0
elif opts.step_num == 1:
_run_task(multiply_value_by_2, args, stdin, stdout)
return 0
else:
print >> stderr, 'There is no step %d mapper!' % opts.step_num
return 1
# run reducer if --step-num is correct
if opts.reducer:
if opts.step_num == 0:
_run_task(sum_word_occurrences, args, stdin, stdout)
return 0
else:
print >> stderr, 'There is no step %d reducer!' % opts.step_num
return 1
raise Exception("How did we get here???")
if __name__ == '__main__':
# invoke with sys.argv, etc. Test cases might use different values.
sys.exit(main(None, sys.stdin, sys.stdout, sys.stderr))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment