Last active
June 23, 2023 18:33
-
-
Save mcastilho/e051898d129b44e2f502 to your computer and use it in GitHub Desktop.
Cheap MapReduce in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"encoding/csv" | |
"encoding/json" | |
"fmt" | |
"io" | |
"os" | |
"path/filepath" | |
"runtime" | |
"strconv" | |
"strings" | |
) | |
const ( | |
MaxWorkers = 10 | |
) | |
type Telemetry struct { | |
Request struct { | |
Sender string `json:"Sender,omitempty"` | |
Trigger string `json:"Trigger,omitempty"` | |
} `json:"Request,omitempty"` | |
App struct { | |
Program string `json:"Program,omitempty"` | |
Build string `json:"Build,omitempty"` | |
License string `json:"License,omitempty"` | |
Version string `json:"Version,omitempty"` | |
} `json:"App,omitempty"` | |
Connection struct { | |
Type string `json:"Type,omitempty"` | |
} `json:"Connection,omitempty"` | |
Region struct { | |
Continent string `json:"Continent,omitempty"` | |
Country string `json:"Country,omitempty"` | |
} `json:"Region,omitempty"` | |
Client struct { | |
OsVersion string `json:"OsVersion,omitempty"` | |
Language string `json:"Language,omitempty"` | |
Architecture string `json:"Architecture,omitempty"` | |
} `json:"Client,omitempty"` | |
} | |
func enumerateFiles(dirname string) chan interface{} { | |
output := make(chan interface{}) | |
go func() { | |
filepath.Walk(dirname, func(path string, f os.FileInfo, err error) error { | |
if !f.IsDir() { | |
output <- path | |
} | |
return nil | |
}) | |
close(output) | |
}() | |
return output | |
} | |
func enumerateJSON(filename string) chan string { | |
output := make(chan string) | |
go func() { | |
file, err := os.Open(filename) | |
if err != nil { | |
return | |
} | |
defer file.Close() | |
reader := bufio.NewReader(file) | |
for { | |
line, err := reader.ReadString('\n') | |
if err == io.EOF { | |
break | |
} | |
// ignore any meta comments on top of JSON file | |
if strings.HasPrefix(line, "#") == true { | |
continue | |
} | |
// add each json line to our enumeration channel | |
output <- line | |
} | |
close(output) | |
}() | |
return output | |
} | |
// MapperCollector is a channel that collects the output from mapper tasks | |
type MapperCollector chan chan interface{} | |
// MapperFunc is a function that performs the mapping part of the MapReduce job | |
type MapperFunc func(interface{}, chan interface{}) | |
// ReducerFunc is a function that performs the reduce part of the MapReduce job | |
type ReducerFunc func(chan interface{}, chan interface{}) | |
func mapperDispatcher(mapper MapperFunc, input chan interface{}, collector MapperCollector) { | |
for item := range input { | |
taskOutput := make(chan interface{}) | |
go mapper(item, taskOutput) | |
collector <- taskOutput | |
} | |
close(collector) | |
} | |
func reducerDispatcher(collector MapperCollector, reducerInput chan interface{}) { | |
for output := range collector { | |
reducerInput <- <-output | |
} | |
close(reducerInput) | |
} | |
func mapper(filename interface{}, output chan interface{}) { | |
results := map[Telemetry]int{} | |
// start the enumeration of each JSON lines in the file | |
for line := range enumerateJSON(filename.(string)) { | |
// decode the telemetry JSON line | |
dec := json.NewDecoder(strings.NewReader(line)) | |
var telemetry Telemetry | |
// if line cannot be JSON decoded then skip to next one | |
if err := dec.Decode(&telemetry); err == io.EOF { | |
continue | |
} else if err != nil { | |
continue | |
} | |
// stores Telemetry structure in the mapper results dictionary | |
previousCount, exists := results[telemetry] | |
if !exists { | |
results[telemetry] = 1 | |
} else { | |
results[telemetry] = previousCount + 1 | |
} | |
} | |
output <- results | |
} | |
func reducer(input chan interface{}, output chan interface{}) { | |
results := map[Telemetry]int{} | |
for matches := range input { | |
for key, value := range matches.(map[Telemetry]int) { | |
_, exists := results[key] | |
if !exists { | |
results[key] = value | |
} else { | |
results[key] = results[key] + value | |
} | |
} | |
} | |
output <- results | |
} | |
func mapReduce(mapper MapperFunc, reducer ReducerFunc, input chan interface{}) interface{} { | |
reducerInput := make(chan interface{}) | |
reducerOutput := make(chan interface{}) | |
mapperCollector := make(MapperCollector, MaxWorkers) | |
go reducer(reducerInput, reducerOutput) | |
go reducerDispatcher(mapperCollector, reducerInput) | |
go mapperDispatcher(mapper, input, mapperCollector) | |
return <-reducerOutput | |
} | |
func main() { | |
runtime.GOMAXPROCS(runtime.NumCPU()) | |
fmt.Println("Processing. Please wait....") | |
// start the enumeration of files to be processed into a channel | |
input := enumerateFiles(".") | |
// this will start the map reduce work | |
results := mapReduce(mapper, reducer, input) | |
// open output file | |
f, err := os.Create("telemetry.csv") | |
if err != nil { | |
panic(err) | |
} | |
defer f.Close() | |
// make a write buffer | |
writer := csv.NewWriter(f) | |
for telemetry, value := range results.(map[Telemetry]int) { | |
var record []string | |
record = append(record, telemetry.Request.Sender) | |
record = append(record, telemetry.Request.Trigger) | |
record = append(record, telemetry.App.Program) | |
record = append(record, telemetry.App.Build) | |
record = append(record, telemetry.App.License) | |
record = append(record, telemetry.App.Version) | |
record = append(record, telemetry.Connection.Type) | |
record = append(record, telemetry.Region.Continent) | |
record = append(record, telemetry.Region.Country) | |
record = append(record, telemetry.Client.OsVersion) | |
record = append(record, telemetry.Client.Language) | |
record = append(record, telemetry.Client.Architecture) | |
// The last field of the CSV line is the aggregate count for each occurrence | |
record = append(record, strconv.Itoa(value)) | |
writer.Write(record) | |
} | |
writer.Flush() | |
fmt.Println("Done!") | |
} |
Welcome to try github.com/chrislusf/glow . You can just use the standalone version, and I believe the code would be much simpler.
@stgleb nope, value type is int.
results := map[Telemetry]int{}
equivalent to
results := make(map[Telemetry]int)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@mcastilho I'm trying to understand something about
mapperDispatcher
-- it seems as if the dispatcher will block oncollector <- taskOutput
within that loop, making the execution inmapper
effectively sequential rather than parallel. Am I misunderstanding? I would have thought that we'd want that step to be fully parallelized as well.