Skip to content

Instantly share code, notes, and snippets.

@imrenagi
Created April 14, 2019 02:45
Show Gist options
  • Save imrenagi/10b1124de76a7f0f335ff7f8cd897a94 to your computer and use it in GitHub Desktop.
Save imrenagi/10b1124de76a7f0f335ff7f8cd897a94 to your computer and use it in GitHub Desktop.
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
var intermediary map[string][]string = make(map[string][]string)
//decode input file for reduce job
for m := 0; m < nMap; m++ {
fr, _ := os.Open(reduceName(jobName, m, reduceTask))
decoder := json.NewDecoder(fr)
loop:
for {
var kv KeyValue
err := decoder.Decode(&kv)
if err != nil {
break loop
}
//construct the intermediary kv pairs
if val, ok := intermediary[kv.Key]; !ok {
arr := make([]string, 0)
arr = append(arr, kv.Value)
intermediary[kv.Key] = arr
} else {
val = append(val, kv.Value)
intermediary[kv.Key] = val
}
}
}
out, _ := os.Create(outFile)
enc := json.NewEncoder(out)
//run the reduce function
for key := range intermediary {
// encode it to json so it can be merged by the merger
enc.Encode(&KeyValue{key, reduceF(key, intermediary[key])})
}
defer out.Close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment