Skip to content

Instantly share code, notes, and snippets.

@metafeather
Last active August 29, 2015 14:04
Show Gist options
  • Save metafeather/018914fb16b5174e2e42 to your computer and use it in GitHub Desktop.
Save metafeather/018914fb16b5174e2e42 to your computer and use it in GitHub Desktop.
MapReduce in Go from JSON via stdin
// MAPPER
// ref: http://open.blogs.nytimes.com/2014/07/10/emr-streaming-in-go/
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
)
func main() {
var line []byte
var input logRecord
var output mapperOutput
var outputJSON []byte
var err error
// loop through each line of stdin
ls := bufio.NewScanner(os.Stdin)
for ls.Scan() {
line = ls.Bytes()
// parse the incoming json
if err = json.Unmarshal(line, &input); err != nil {
log.Print("unable to read log: ", err)
continue
}
// initialize output structure
output = mapperOutput{
// grab an identifier
input.Data.Key,
// and any other useful information from input json
input.Data.AnotherKey,
input.Data.Metric,
input.Data.AnotherMetric,
}
// generate json output
if outputJSON, err = json.Marshal(output); err != nil {
log.Print("unable to write mapper output: ", err)
continue
}
// write the key and json to stdout
fmt.Fprintf(os.Stdout, "%s\t%s\n", output.Key, outputJSON)
}
if ls.Err() != nil {
log.Print("error reading from stdin: ", ls.Err())
os.Exit(1)
}
}
type logRecord struct {
Data struct {
Key string `json:"key"`
AnotherKey string `json:"another-key"`
Metric int64 `json:"metric"`
AnotherMetric int64 `json:"metric-2"`
} `json:"data"`
}
type mapperOutput struct {
Key string `json:"key"`
SecondaryKey string `json:"secondary-key"`
FirstMetric int64 `json:"first-metric"`
SecondMetric int64 `json:"second-metric"`
}
// REDUCER
// ref: http://open.blogs.nytimes.com/2014/07/10/emr-streaming-in-go/
package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"log"
"os"
)
var tab = []byte("\t")
func main() {
var rawInput []string
var input mapperOutput
var outputJSON []byte
var err error
// initiate output map
output := make(map[string]mapperOutput)
// loop through each line for stdin
ls := bufio.NewScanner(os.Stdin)
for ls.Scan() {
// split line to separate key and value
rawInput = bytes.Split(ls.Bytes(), tab)
// parse the incoming json
if err = json.Unmarshal(rawInput[1], &input); err != nil {
log.Print("unable to parse reducer input: ", err)
continue
}
// check if key already exists
if ongoingCount, exists := output[input.Key]; exists {
// grab ongoing count from output and increment metrics
ongoingCount.FirstMetric += input.FirstMetric
ongoingCount.SecondMetric += input.SecondMetric
// place updated values back into output
output[input.Key] = ongoingCount
} else {
// if this is the first time we've seen this key, add it to output
output[input.Key] = input
}
}
if ls.Err() != nil {
log.Print("error reading from stdin: ", ls.Err())
os.Exit(1)
}
// once we've read all lines, emit output
for key, value := range output {
// generate json output
if outputJSON, err = json.Marshal(value); err != nil {
log.Print("unable to marshal reducer json: ", err)
continue
}
// write the key and json to stdout
fmt.Fprintf(os.Stdout, "%s\t%s", key, outputJSON)
}
}
type mapperOutput struct {
Key string `json:"key"`
SecondaryKey string `json:"secondary-key"`
FirstMetric int64 `json:"first-metric"`
SecondMetric int64 `json:"second-metric"`
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment