1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| func (mr *MapReduce) RunMaster() *list.List { // Your code here idleWorder := make(chan string) mapChan := make(chan int) reduceChan := make(chan int) for i := 0; i < mr.nMap; i++ { go func(mapN int) { for { var worker string success := false select { case worker = <-mr.registerChannel: mr.Workers[worker] = &WorkerInfo{address: worker} jobArgs := DoJobArgs{File: mr.file, Operation: Map, JobNumber: mapN, NumOtherPhase: mr.nReduce} var reply DoJobReply success = call(worker, "Worker.DoJob", jobArgs, &reply) case worker = <-idleWorder: jobArgs := DoJobArgs{File: mr.file, Operation: Map, JobNumber: mapN, NumOtherPhase: mr.nReduce} var reply DoJobReply success = call(worker, "Worker.DoJob", jobArgs, &reply) } if success { mapChan <- mapN idleWorder <- worker return } else { delete(mr.Workers, worker) } } }(i) } for i := 0; i < mr.nMap; i++ { <-mapChan } for i := 0; i < mr.nReduce; i++ { go func(reduceN int) { for { var worker string success := false select { case worker = <-mr.registerChannel: mr.Workers[worker] = &WorkerInfo{address: worker} jobArgs := DoJobArgs{File: mr.file, Operation: Reduce, JobNumber: reduceN, NumOtherPhase: mr.nMap} var reply DoJobReply success = call(worker, "Worker.DoJob", jobArgs, &reply) case worker = <-idleWorder: jobArgs := DoJobArgs{File: mr.file, Operation: Reduce, JobNumber: reduceN, NumOtherPhase: mr.nMap} var reply DoJobReply success = call(worker, "Worker.DoJob", jobArgs, &reply) } if success { reduceChan <- reduceN idleWorder <- worker return } else { delete(mr.Workers, worker) } } }(i) } fmt.Println("waiting for reduce done!") for i := 0; i < mr.nReduce; i++ { <-reduceChan } fmt.Println("reduce done with living worker", len(mr.Workers)) // consume idle workers... for i := 0; i < len(mr.Workers); i++ { fmt.Println(<-idleWorder) } return mr.KillWorkers() }
|