Skip to content

Instantly share code, notes, and snippets.

@imrenagi
Created April 20, 2019 15:35
Show Gist options
  • Save imrenagi/8cf210ef646ec2238eb2df58c892b121 to your computer and use it in GitHub Desktop.
Save imrenagi/8cf210ef646ec2238eb2df58c892b121 to your computer and use it in GitHub Desktop.
package mapreduce
import (
"fmt"
"sync"
)
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
availableWorkers := make(chan string, 1000)
go func() {
for {
select {
case worker, isOpen := <-registerChan:
if isOpen {
availableWorkers <- worker
}
}
}
}()
var wg sync.WaitGroup
for numTask := 0; numTask < ntasks; numTask++ {
wg.Add(1)
go func(numTask int, availableWorkers chan string, jobName string, mapFiles []string, phase jobPhase, n_other int) {
success := false
for !success {
workerRPCAddr := <-availableWorkers
ok := call(workerRPCAddr, "Worker.DoTask", DoTaskArgs{
JobName: jobName,
File: mapFiles[numTask],
Phase: phase,
TaskNumber: numTask,
NumOtherPhase: n_other,
}, nil)
if ok {
wg.Done()
success = true
}
availableWorkers <- workerRPCAddr
}
}(numTask, availableWorkers, jobName, mapFiles, phase, n_other)
}
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment