Last active
July 20, 2019 20:43
-
-
Save ggirtsou/df2b521cbee1d3077a174d9346c50059 to your computer and use it in GitHub Desktop.
The mapreduce programming model in Go. This is for big data scenarios where all data cannot be loaded at once and we distribute the computations in many nodes. In this example the goroutines represent nodes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"strings" | |
"sync" | |
"github.com/davecgh/go-spew/spew" | |
) | |
func main() { | |
documents := []string{ | |
"apple cat banana apple car dog cat cat", | |
"desk chair banana screen house house", | |
} | |
// Split stage: concurrently tokenize documents | |
var wg sync.WaitGroup | |
wg.Add(len(documents)) | |
var tokenizedDocs [][]string | |
for _, document := range documents { | |
go func(document string) { | |
defer wg.Done() | |
tokenizedDocs = append(tokenizedDocs, Split([]byte(document))) | |
}(document) | |
} | |
wg.Wait() | |
mappedResults := make([]map[string]int, len(documents)) | |
// Map stage - concurrently map words to frequency count | |
wg.Add(len(tokenizedDocs)) | |
for _, document := range tokenizedDocs { | |
go func(document []string) { | |
mappedResults = append(mappedResults, Map(document)) | |
defer wg.Done() | |
}(document) | |
} | |
wg.Wait() | |
// Shuffling stage - group data in relevant buckets | |
partitionedData := Shuffle(mappedResults) | |
// Reduce stage - aggregate counts and print them | |
spew.Dump(Reduce(partitionedData)) | |
} | |
// Split tokenizes a document. | |
func Split(document []byte) []string { | |
return strings.Split(string(document), " ") | |
} | |
// Map expects tokenized words and creates a key-value pair of word-frequency | |
func Map(wordsFromDoc []string) map[string]int { | |
m := make(map[string]int, len(wordsFromDoc)) | |
for _, v := range wordsFromDoc { | |
m[v]++ | |
} | |
return m | |
} | |
// WordFreq represents a tuple that holds the word and how many times it appears in a document | |
type WordFreq struct { | |
Word string | |
Count int | |
} | |
// Shuffle takes Map stage result as input, and groups them in relevant buckets | |
func Shuffle(documents []map[string]int) map[string][]WordFreq { | |
results := make(map[string][]WordFreq) | |
for _, document := range documents { | |
for w, c := range document { | |
results[w] = append(results[w], WordFreq{w, c}) | |
} | |
} | |
return results | |
} | |
// Reduce takes Shuffle stage result as input and aggregates counts from all buckets. | |
func Reduce(input map[string][]WordFreq) map[string]int { | |
results := make(map[string]int) | |
for word, freq := range input { | |
count := 0 | |
for _, f := range freq { | |
count += f.Count | |
} | |
results[word] += count | |
} | |
return results | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment