Skip to content

Instantly share code, notes, and snippets.

@ijingo
Created November 19, 2016 10:04
Show Gist options
  • Save ijingo/85989514557cfd0cdbff9ed8d9ba6b41 to your computer and use it in GitHub Desktop.
Save ijingo/85989514557cfd0cdbff9ed8d9ba6b41 to your computer and use it in GitHub Desktop.
func doMap(
jobName string,
mapTaskNumber int,
inFile string,
nReduce int,
mapF func(file string, contents string) []KeyValue,
) {
buff, err := ioutil.ReadFile(inFile)
if err != nil {
log.Fatal("Mapper read fail ", err)
}
res := mapF(inFile, string(buff))
for i := 0; i < nReduce; i++ {
outFileName := reduceName(jobName, mapTaskNumber, i)
outFile, err := os.Create(outFileName)
defer outFile.Close()
if err != nil {
log.Fatal("Mapper create itermediate file fail ", err)
}
enc := json.NewEncoder(outFile)
for _, kv := range res {
if ihash(kv.Key)%uint32(nReduce) == uint32(i) {
err := enc.Encode(&kv)
if err != nil {
log.Fatal("Encode error ", err)
}
}
}
}
}
func doReduce(
jobName string,
reduceTaskNumber int,
nMap int,
reduceF func(key string, values []string) string,
) {
kvs := make(map[string][]string)
for i := 0; i < nMap; i++ {
inFileName := reduceName(jobName, i, reduceTaskNumber)
inFile, err := os.Open(inFileName)
defer inFile.Close()
if err != nil {
log.Fatal("Reducer open file fail ", err)
}
dec := json.NewDecoder(inFile)
for {
var kv KeyValue
err := dec.Decode(&kv)
if err != nil {
break
}
if _, ok := kvs[kv.Key]; !ok {
kvs[kv.Key] = make([]string, 0)
}
kvs[kv.Key] = append(kvs[kv.Key], kv.Value)
}
}
var keys []string
for k, _ := range kvs {
keys = append(keys, k)
}
sort.Strings(keys)
outFileName := mergeName(jobName, reduceTaskNumber)
outFile, err := os.Create(outFileName)
defer outFile.Close()
if err != nil {
log.Fatal("Reducer create file fail ", err)
}
enc := json.NewEncoder(outFile)
for _, k := range keys {
res := reduceF(k, kvs[k])
enc.Encode(&KeyValue{k, res})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment