-
-
Save jiacai2050/6f58ea326fef5ec2176f to your computer and use it in GitHub Desktop.
Simple MapReduce example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
def mapper(emit, text): | |
words = re.split('[^\w]', text.lower()) | |
for word in words: | |
if word: | |
emit(word, 1) | |
def reducer(key, values): | |
total = 0 | |
for value in values: | |
total += value | |
return [total] | |
class MapReducer: | |
def __init__(self, records, mapper, reducer): | |
self.queues = {} | |
self.records = records | |
self.mapper = mapper | |
self.reducer = reducer | |
self.total_emits = 0 | |
# Create the queue if it does not exist yet, or append to an existing queue. | |
def emit(self, key, value): | |
if key not in self.queues: | |
self.queues[key] = [value] | |
else: | |
self.queues[key].append(value) | |
self.total_emits += 1 | |
print "emit(%s: %s)" % (key, value) | |
# Start the MapReducer and return the final result. | |
def run(self): | |
# Map each of the records. | |
for record in self.records: | |
self.mapper(self.emit, record) | |
print "Finished mapping %d records into %d queues with %d values.\n" % ( | |
len(self.records), len(self.queues), self.total_emits | |
) | |
# Reduce until it we have just one value for each queue. | |
size = 3 | |
for queue in self.queues: | |
while len(self.queues[queue]) > 1: | |
queue_size = len(self.queues[queue]) | |
new_queue = [] | |
for i in xrange(0, len(self.queues[queue]), size): | |
values = self.queues[queue][i:i + size] | |
new_queue.extend(self.reducer(queue, values)) | |
self.queues[queue] = new_queue | |
print "Reduced %d values for '%s' into %d values." % ( | |
queue_size, queue, len(self.queues[queue])) | |
# Each of the queues will have on value now. Unwrap it to an associative | |
# array. | |
for queue in self.queues: | |
print "%s: %s" % (queue, self.queues[queue][0]) | |
records = [ | |
"The buffalo from Buffalo who are buffaloed by buffalo", | |
"Buffalo, buffalo (verb) other buffalo from Buffalo", | |
] | |
mr = MapReducer(records, mapper, reducer) | |
mr.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment