Skip to content

Instantly share code, notes, and snippets.

@ggirtsou
Last active July 20, 2019 20:43
Show Gist options
  • Save ggirtsou/df2b521cbee1d3077a174d9346c50059 to your computer and use it in GitHub Desktop.
Save ggirtsou/df2b521cbee1d3077a174d9346c50059 to your computer and use it in GitHub Desktop.
The mapreduce programming model in Go. This is for big data scenarios where all data cannot be loaded at once and we distribute the computations in many nodes. In this example the goroutines represent nodes.
package main
import (
"strings"
"sync"
"github.com/davecgh/go-spew/spew"
)
func main() {
documents := []string{
"apple cat banana apple car dog cat cat",
"desk chair banana screen house house",
}
// Split stage: concurrently tokenize documents
var wg sync.WaitGroup
wg.Add(len(documents))
var tokenizedDocs [][]string
for _, document := range documents {
go func(document string) {
defer wg.Done()
tokenizedDocs = append(tokenizedDocs, Split([]byte(document)))
}(document)
}
wg.Wait()
mappedResults := make([]map[string]int, len(documents))
// Map stage - concurrently map words to frequency count
wg.Add(len(tokenizedDocs))
for _, document := range tokenizedDocs {
go func(document []string) {
mappedResults = append(mappedResults, Map(document))
defer wg.Done()
}(document)
}
wg.Wait()
// Shuffling stage - group data in relevant buckets
partitionedData := Shuffle(mappedResults)
// Reduce stage - aggregate counts and print them
spew.Dump(Reduce(partitionedData))
}
// Split tokenizes a document.
func Split(document []byte) []string {
return strings.Split(string(document), " ")
}
// Map expects tokenized words and creates a key-value pair of word-frequency
func Map(wordsFromDoc []string) map[string]int {
m := make(map[string]int, len(wordsFromDoc))
for _, v := range wordsFromDoc {
m[v]++
}
return m
}
// WordFreq represents a tuple that holds the word and how many times it appears in a document
type WordFreq struct {
Word string
Count int
}
// Shuffle takes Map stage result as input, and groups them in relevant buckets
func Shuffle(documents []map[string]int) map[string][]WordFreq {
results := make(map[string][]WordFreq)
for _, document := range documents {
for w, c := range document {
results[w] = append(results[w], WordFreq{w, c})
}
}
return results
}
// Reduce takes Shuffle stage result as input and aggregates counts from all buckets.
func Reduce(input map[string][]WordFreq) map[string]int {
results := make(map[string]int)
for word, freq := range input {
count := 0
for _, f := range freq {
count += f.Count
}
results[word] += count
}
return results
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment