Skip to content

Instantly share code, notes, and snippets.

@ijingo
Created November 19, 2016 09:56
Show Gist options
  • Save ijingo/8bf4ba05b8264f77c7b4a48d6835a724 to your computer and use it in GitHub Desktop.
Save ijingo/8bf4ba05b8264f77c7b4a48d6835a724 to your computer and use it in GitHub Desktop.
func (mr *Master) schedule(phase jobPhase) {
var ntasks int
var nios int
switch phase {
case mapPhase:
ntasks = len(mr.files)
nios = mr.nReduce
case reducePhase:
ntasks = mr.nReduce
nios = len(mr.files)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)
var wg sync.WaitGroup
for i := 0; i < ntasks; i++ {
wg.Add(1)
go func(tasksNumber int, nios int, phase jobPhase) {
defer wg.Done()
for {
worker := <-mr.registerChannel
args := DoTaskArgs{mr.jobName, mr.files[tasksNumber], phase, tasksNumber, nios}
ok := call(worker, "Worker.DoTask", &args, new(struct{}))
if ok {
go func() {
mr.registerChannel <- worker
}()
break
}
}
}(i, nios, phase)
}
wg.Wait()
fmt.Printf("Schedule: %v phase done\n", phase)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment