Skip to content

Instantly share code, notes, and snippets.

@jiacai2050
Forked from elliotchance/mapreduce.py
Created March 6, 2016 01:45
Show Gist options
  • Save jiacai2050/6f58ea326fef5ec2176f to your computer and use it in GitHub Desktop.
Save jiacai2050/6f58ea326fef5ec2176f to your computer and use it in GitHub Desktop.
Simple MapReduce example
import re
def mapper(emit, text):
words = re.split('[^\w]', text.lower())
for word in words:
if word:
emit(word, 1)
def reducer(key, values):
total = 0
for value in values:
total += value
return [total]
class MapReducer:
def __init__(self, records, mapper, reducer):
self.queues = {}
self.records = records
self.mapper = mapper
self.reducer = reducer
self.total_emits = 0
# Create the queue if it does not exist yet, or append to an existing queue.
def emit(self, key, value):
if key not in self.queues:
self.queues[key] = [value]
else:
self.queues[key].append(value)
self.total_emits += 1
print "emit(%s: %s)" % (key, value)
# Start the MapReducer and return the final result.
def run(self):
# Map each of the records.
for record in self.records:
self.mapper(self.emit, record)
print "Finished mapping %d records into %d queues with %d values.\n" % (
len(self.records), len(self.queues), self.total_emits
)
# Reduce until it we have just one value for each queue.
size = 3
for queue in self.queues:
while len(self.queues[queue]) > 1:
queue_size = len(self.queues[queue])
new_queue = []
for i in xrange(0, len(self.queues[queue]), size):
values = self.queues[queue][i:i + size]
new_queue.extend(self.reducer(queue, values))
self.queues[queue] = new_queue
print "Reduced %d values for '%s' into %d values." % (
queue_size, queue, len(self.queues[queue]))
print
# Each of the queues will have on value now. Unwrap it to an associative
# array.
for queue in self.queues:
print "%s: %s" % (queue, self.queues[queue][0])
records = [
"The buffalo from Buffalo who are buffaloed by buffalo",
"Buffalo, buffalo (verb) other buffalo from Buffalo",
]
mr = MapReducer(records, mapper, reducer)
mr.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment