17 Sep 2014 - This is a post on my blog.
MapReduce is a powerful algorithm for processing large sets of data in a distributed, parallel manner. It has proven very popular for many data processing tasks, particularly using the open source Hadoop implementation.
The most basic idea powering MapReduce is to break large data sets into smaller chunks, which are then processed separately (in parallel). The results of the chunk processing are then collected.
(Copyright (c) 2013 Roy Keyes. License: Creative Commons Attribution-Share Alike 4.0)
These two phases are called map and reduce, due to their similarity to the map and reduce paradigms from functional programming. During the map phase a desired function is applied to the small chunks. The reduce phase takes collected intermediate results and reduces them to a smaller set of outputs.
# Find the product of the modulo 3 of a list using the map and reduce functions
alist = [1,2,3,4,5]
modulo_product = reduce(lambda x,y: x*y,map(lambda x: x % 3,alist))
# The equivalent using loops
modulo_product = 0
for n in alist:
modulo_product *= n % 3
A very basic task accomplished well with MapReduce is counting the number of word occurrences in a set of documents. The steps are as follows:
- Break the documents up into smaller chunks.
- For each chunk, return a key-value pair where the key is a word in the document and the value is simply 1: ("python", 1)
- Group all values by key (i.e. the word) together as an array in a new key-values pair: ("python",[1,1,1,1,1]).
- For each word, sum the number of occurrences and return a tuple of the word and the word count: ("python",5).
In this example, step 2 is the map phase and step 4 is the reduce phase. Steps 1 and 3 are equally important, but happen "behind the scenes" in a consistent way. Step 3 is known as "shuffle", where key-value pairs are grouped by key.
MapReduce is flexible, but still quite constrained in its model. This leads to a number of patterns and tricks to handle common data processing scenarios.
The word count example falls under the pattern of summarizing data. The basic pattern is
- Map: Find all instances of data, possibly meeting some criteria and returning them.
- Reduce: Count, average, or other calculation on the returned data.
# Word count
def mapper(document):
# Assume document is a list of words.
words = document.split()
for word in words:
emit(word,1)
def reducer(key, values):
emit(word,sum(values))
emit() is MapReduce jargon for what is returned by the function.
Beyond word counting, this pattern is useful for counting social network connections per node (person) or counting any type of value in an input file (e.g. words of a certain length, etc).
Collating is the inverse of counting words and can be used to build inverse indices or ETL type tasks.
The basic pattern:
- Map: For each value in the data, return the value along with the file/document it came from.
- Reduce: Simply pass the key-value list through. It might be necessary to eliminate duplicate values.
# Building an inverse index
def mapper(filename,document):
# Assume document is a list of words.
words = document.split()
for word in words:
emit(word,filename)
def reducer(key,values):
# Remove duplicate values
emit(key,list(set(values))
The inverse index returns a list of all words along with a list of each document that contains that word.
A common task is transforming data, such as format conversions. In this case the map phase does all of the work.
- Map: Transform data and return key-value pair, where the key is the (new) file name and the value is the transformed data.
- Reduce: Simply pass the key-value pair through.
# FLAC to OGG audio file conversion.
def mapper(flac_file_name):
ogg_file_name = flac_file_name.split('.')[0]+'.ogg'
emit(ogg_file_name,flac2ogg(flac_file_name)
def reducer(key,value):
emit(key,value)
Filtering data by some criteria is very common and basic task.
- Map: Return data that meets criteria as key-value pair, where the key is null and the value is the data.
- Reduce: Return the list of values.
# Top 10 filter
def mapper(alist):
# Sort list alphabetically and return first 10 items
emit(None,sorted(alist)[:10])
def reducer(key,values):
# All lists combined as all keys are None.
# Sort combined lists and return top 10
all = []
for value in values:
all.extend(value)
emit(sorted(all)[:10])
It is often not possible to carry out some data processing task using MapReduce in a single pass. To accomplish these tasks, the solution is often to chain multiple MapReduce passes together.
An example of chaining is to produce the per document TF-IDF for a set of documents. One version of the TF-IDF data flow is as follows:
- Per document word count:
- Map: Return key-value pairs, where the key is a word-document name tuple and the value is 1: ((word,doc),1)
- Reduce: Return key-value pairs, where the key is the word-document name tuple and the value is the total count for that word-document name combination: ((word,doc),n)
- Compute term frequency per document:
- Map: Return key-value pairs, where the key is the document name and the value is a word-count tuple: (doc,(word,n))
- Reduce: Return key-value pairs, where the key is a word-document name tuple, and the value is n/N, where n is the word count and N is the total word count for the document: ((word,doc),n/N)
- Compute TF-IDF
- Map: Return a key-value pair, where the key is the word and the value is a tuple of the document name and n/N: (word,(doc,n/N))
- Reduce: Return a key-value pair, where the key is the word-document name pair and the value is the TD-IDF = n/N * log(D/d), where D is the total document count (computed externally) and d is the count of documents with the given word: ((word,doc),TFIDF)
In the TF-IDF algorithm above, the input of the subsequent MapReduce pass is the output of the previous.