Skip to content

Instantly share code, notes, and snippets.

View ijingo's full-sized avatar
🐽

WANG Ji 王冀 ijingo

🐽
View GitHub Profile
func doMap(
jobName string,
mapTaskNumber int,
inFile string,
nReduce int,
mapF func(file string, contents string) []KeyValue,
) {
buff, err := ioutil.ReadFile(inFile)
if err != nil {
func (mr *Master) schedule(phase jobPhase) {
var ntasks int
var nios int
switch phase {
case mapPhase:
ntasks = len(mr.files)
nios = mr.nReduce
case reducePhase:
ntasks = mr.nReduce
nios = len(mr.files)
type Worker struct {
sync.Mutex
name string
Map func(string, string) []KeyValue
Reduce func(string, []string) string
nRPC int
nTasks int
l net.Listener
}
type Master struct {
sync.Mutex
address string
registerChannel chan string
doneChannel chan bool
workers []string // protected by the mutex
// per-task info
jobName string
func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error {
mr.Lock()
defer mr.Unlock()
debug("Register: worker %s\n", args.Worker)
mr.workers = append(mr.workers, args.Worker)
go func() {
mr.registerChannel <- args.Worker
}()
return nil
}
func (wk *Worker) DoTask(arg *DoTaskArgs, _ *struct{}) error {
fmt.Printf("%s: given %v task #%d on file %s (nois: %d)\n",
wk.name, arg.Phase, arg.TaskNumber, arg.File, arg.NumOtherPhase)
switch arg.Phase {
case mapPhase:
doMap(arg.JobName, arg.TaskNumber, arg.File, arg.NumOtherPhase, wk.Map)
case reducePhase:
doReduce(arg.JobName, arg.TaskNumber, arg.NumOtherPhase, wk.Reduce)
}
package main
import (
"fmt"
"log"
"net"
"net/rpc"
"shared"
)
package main
import (
"errors"
"log"
"net"
"net/rpc"
"shared"
)
package main
import (
"fmt"
"log"
"net/rpc"
"shared"
)
package main
import (
"errors"
"log"
"net"
"net/http"
"net/rpc"
"shared"