Last active
August 29, 2015 14:04
-
-
Save metafeather/018914fb16b5174e2e42 to your computer and use it in GitHub Desktop.
MapReduce in Go from JSON via stdin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// MAPPER | |
// ref: http://open.blogs.nytimes.com/2014/07/10/emr-streaming-in-go/ | |
package main | |
import ( | |
"bufio" | |
"encoding/json" | |
"fmt" | |
"log" | |
"os" | |
) | |
func main() { | |
var line []byte | |
var input logRecord | |
var output mapperOutput | |
var outputJSON []byte | |
var err error | |
// loop through each line of stdin | |
ls := bufio.NewScanner(os.Stdin) | |
for ls.Scan() { | |
line = ls.Bytes() | |
// parse the incoming json | |
if err = json.Unmarshal(line, &input); err != nil { | |
log.Print("unable to read log: ", err) | |
continue | |
} | |
// initialize output structure | |
output = mapperOutput{ | |
// grab an identifier | |
input.Data.Key, | |
// and any other useful information from input json | |
input.Data.AnotherKey, | |
input.Data.Metric, | |
input.Data.AnotherMetric, | |
} | |
// generate json output | |
if outputJSON, err = json.Marshal(output); err != nil { | |
log.Print("unable to write mapper output: ", err) | |
continue | |
} | |
// write the key and json to stdout | |
fmt.Fprintf(os.Stdout, "%s\t%s\n", output.Key, outputJSON) | |
} | |
if ls.Err() != nil { | |
log.Print("error reading from stdin: ", ls.Err()) | |
os.Exit(1) | |
} | |
} | |
type logRecord struct { | |
Data struct { | |
Key string `json:"key"` | |
AnotherKey string `json:"another-key"` | |
Metric int64 `json:"metric"` | |
AnotherMetric int64 `json:"metric-2"` | |
} `json:"data"` | |
} | |
type mapperOutput struct { | |
Key string `json:"key"` | |
SecondaryKey string `json:"secondary-key"` | |
FirstMetric int64 `json:"first-metric"` | |
SecondMetric int64 `json:"second-metric"` | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// REDUCER | |
// ref: http://open.blogs.nytimes.com/2014/07/10/emr-streaming-in-go/ | |
package main | |
import ( | |
"bufio" | |
"bytes" | |
"encoding/json" | |
"fmt" | |
"log" | |
"os" | |
) | |
var tab = []byte("\t") | |
func main() { | |
var rawInput []string | |
var input mapperOutput | |
var outputJSON []byte | |
var err error | |
// initiate output map | |
output := make(map[string]mapperOutput) | |
// loop through each line for stdin | |
ls := bufio.NewScanner(os.Stdin) | |
for ls.Scan() { | |
// split line to separate key and value | |
rawInput = bytes.Split(ls.Bytes(), tab) | |
// parse the incoming json | |
if err = json.Unmarshal(rawInput[1], &input); err != nil { | |
log.Print("unable to parse reducer input: ", err) | |
continue | |
} | |
// check if key already exists | |
if ongoingCount, exists := output[input.Key]; exists { | |
// grab ongoing count from output and increment metrics | |
ongoingCount.FirstMetric += input.FirstMetric | |
ongoingCount.SecondMetric += input.SecondMetric | |
// place updated values back into output | |
output[input.Key] = ongoingCount | |
} else { | |
// if this is the first time we've seen this key, add it to output | |
output[input.Key] = input | |
} | |
} | |
if ls.Err() != nil { | |
log.Print("error reading from stdin: ", ls.Err()) | |
os.Exit(1) | |
} | |
// once we've read all lines, emit output | |
for key, value := range output { | |
// generate json output | |
if outputJSON, err = json.Marshal(value); err != nil { | |
log.Print("unable to marshal reducer json: ", err) | |
continue | |
} | |
// write the key and json to stdout | |
fmt.Fprintf(os.Stdout, "%s\t%s", key, outputJSON) | |
} | |
} | |
type mapperOutput struct { | |
Key string `json:"key"` | |
SecondaryKey string `json:"secondary-key"` | |
FirstMetric int64 `json:"first-metric"` | |
SecondMetric int64 `json:"second-metric"` | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment