Skip to content

Instantly share code, notes, and snippets.

@imrenagi
Created April 14, 2019 02:09
Show Gist options
  • Save imrenagi/67c1ea74e04f736f68786957b8cf7ccd to your computer and use it in GitHub Desktop.
Save imrenagi/67c1ea74e04f736f68786957b8cf7ccd to your computer and use it in GitHub Desktop.
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
//read file
byteContent, err := ioutil.ReadFile(inFile)
if err != nil {
fmt.Println(err)
}
content := string(byteContent)
//call map
kvs := mapF(inFile, content)
//create files to store the map result
fileMap := make(map[string]*os.File)
for i := 0; i < nReduce; i++ {
f, _ := os.Create(reduceName(jobName, mapTask, i))
defer f.Close()
fileMap[fmt.Sprintf("%d", i)] = f
}
// shuffle
// for every intermediary key, put the result to the correct partitioned file.
for _, val := range kvs {
hash := ihash(val.Key)
partitionNum := hash % nReduce
enc := json.NewEncoder(fileMap[fmt.Sprintf("%d", partitionNum)])
enc.Encode(&val)
}
}
func ihash(s string) int {
h := fnv.New32a()
h.Write([]byte(s))
return int(h.Sum32() & 0x7fffffff)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment