-
-
Save alexcrownus/d4168a76f6a61501bd6749b18a7e58e6 to your computer and use it in GitHub Desktop.
Cheap MapReduce in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"encoding/csv" | |
"encoding/json" | |
"fmt" | |
"io" | |
"os" | |
"path/filepath" | |
"runtime" | |
"strconv" | |
"strings" | |
) | |
const ( | |
MaxWorkers = 10 | |
) | |
type Telemetry struct { | |
Request struct { | |
Sender string `json:"Sender,omitempty"` | |
Trigger string `json:"Trigger,omitempty"` | |
} `json:"Request,omitempty"` | |
App struct { | |
Program string `json:"Program,omitempty"` | |
Build string `json:"Build,omitempty"` | |
License string `json:"License,omitempty"` | |
Version string `json:"Version,omitempty"` | |
} `json:"App,omitempty"` | |
Connection struct { | |
Type string `json:"Type,omitempty"` | |
} `json:"Connection,omitempty"` | |
Region struct { | |
Continent string `json:"Continent,omitempty"` | |
Country string `json:"Country,omitempty"` | |
} `json:"Region,omitempty"` | |
Client struct { | |
OsVersion string `json:"OsVersion,omitempty"` | |
Language string `json:"Language,omitempty"` | |
Architecture string `json:"Architecture,omitempty"` | |
} `json:"Client,omitempty"` | |
} | |
func enumerateFiles(dirname string) chan interface{} { | |
output := make(chan interface{}) | |
go func() { | |
filepath.Walk(dirname, func(path string, f os.FileInfo, err error) error { | |
if !f.IsDir() { | |
output <- path | |
} | |
return nil | |
}) | |
close(output) | |
}() | |
return output | |
} | |
func enumerateJSON(filename string) chan string { | |
output := make(chan string) | |
go func() { | |
file, err := os.Open(filename) | |
if err != nil { | |
return | |
} | |
defer file.Close() | |
reader := bufio.NewReader(file) | |
for { | |
line, err := reader.ReadString('\n') | |
if err == io.EOF { | |
break | |
} | |
// ignore any meta comments on top of JSON file | |
if strings.HasPrefix(line, "#") == true { | |
continue | |
} | |
// add each json line to our enumeration channel | |
output <- line | |
} | |
close(output) | |
}() | |
return output | |
} | |
// MapperCollector is a channel that collects the output from mapper tasks | |
type MapperCollector chan chan interface{} | |
// MapperFunc is a function that performs the mapping part of the MapReduce job | |
type MapperFunc func(interface{}, chan interface{}) | |
// ReducerFunc is a function that performs the reduce part of the MapReduce job | |
type ReducerFunc func(chan interface{}, chan interface{}) | |
func mapperDispatcher(mapper MapperFunc, input chan interface{}, collector MapperCollector) { | |
for item := range input { | |
taskOutput := make(chan interface{}) | |
go mapper(item, taskOutput) | |
collector <- taskOutput | |
} | |
close(collector) | |
} | |
func reducerDispatcher(collector MapperCollector, reducerInput chan interface{}) { | |
for output := range collector { | |
reducerInput <- <-output | |
} | |
close(reducerInput) | |
} | |
func mapper(filename interface{}, output chan interface{}) { | |
results := map[Telemetry]int{} | |
// start the enumeration of each JSON lines in the file | |
for line := range enumerateJSON(filename.(string)) { | |
// decode the telemetry JSON line | |
dec := json.NewDecoder(strings.NewReader(line)) | |
var telemetry Telemetry | |
// if line cannot be JSON decoded then skip to next one | |
if err := dec.Decode(&telemetry); err == io.EOF { | |
continue | |
} else if err != nil { | |
continue | |
} | |
// stores Telemetry structure in the mapper results dictionary | |
previousCount, exists := results[telemetry] | |
if !exists { | |
results[telemetry] = 1 | |
} else { | |
results[telemetry] = previousCount + 1 | |
} | |
} | |
output <- results | |
} | |
func reducer(input chan interface{}, output chan interface{}) { | |
results := map[Telemetry]int{} | |
for matches := range input { | |
for key, value := range matches.(map[Telemetry]int) { | |
_, exists := results[key] | |
if !exists { | |
results[key] = value | |
} else { | |
results[key] = results[key] + value | |
} | |
} | |
} | |
output <- results | |
} | |
func mapReduce(mapper MapperFunc, reducer ReducerFunc, input chan interface{}) interface{} { | |
reducerInput := make(chan interface{}) | |
reducerOutput := make(chan interface{}) | |
mapperCollector := make(MapperCollector, MaxWorkers) | |
go reducer(reducerInput, reducerOutput) | |
go reducerDispatcher(mapperCollector, reducerInput) | |
go mapperDispatcher(mapper, input, mapperCollector) | |
return <-reducerOutput | |
} | |
func main() { | |
runtime.GOMAXPROCS(runtime.NumCPU()) | |
fmt.Println("Processing. Please wait....") | |
// start the enumeration of files to be processed into a channel | |
input := enumerateFiles(".") | |
// this will start the map reduce work | |
results := mapReduce(mapper, reducer, input) | |
// open output file | |
f, err := os.Create("telemetry.csv") | |
if err != nil { | |
panic(err) | |
} | |
defer f.Close() | |
// make a write buffer | |
writer := csv.NewWriter(f) | |
for telemetry, value := range results.(map[Telemetry]int) { | |
var record []string | |
record = append(record, telemetry.Request.Sender) | |
record = append(record, telemetry.Request.Trigger) | |
record = append(record, telemetry.App.Program) | |
record = append(record, telemetry.App.Build) | |
record = append(record, telemetry.App.License) | |
record = append(record, telemetry.App.Version) | |
record = append(record, telemetry.Connection.Type) | |
record = append(record, telemetry.Region.Continent) | |
record = append(record, telemetry.Region.Country) | |
record = append(record, telemetry.Client.OsVersion) | |
record = append(record, telemetry.Client.Language) | |
record = append(record, telemetry.Client.Architecture) | |
// The last field of the CSV line is the aggregate count for each occurrence | |
record = append(record, strconv.Itoa(value)) | |
writer.Write(record) | |
} | |
writer.Flush() | |
fmt.Println("Done!") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment