From 00bfc7b4efc3a4b02308984f7af5b9e353d19e4b Mon Sep 17 00:00:00 2001 From: bapung Date: Sun, 29 Dec 2024 19:24:37 +0700 Subject: [PATCH] lab1 passed all tests --- src/mr/coordinator.go | 50 ++++++++++++++++++++++++++++++++++++++----- src/mr/worker.go | 22 +++++++++++++++++-- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index bef7339..8e3506c 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -17,8 +17,9 @@ import ( ) const ( - JOB_TIMEOUT_S = 10 * time.Second - DEBUG = false + JOB_TIMEOUT_S = 10 * time.Second + GET_TASK_TIMEOUT = 2 * time.Second + DEBUG = false ) type Coordinator struct { @@ -45,6 +46,8 @@ func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error { c.mu.Lock() defer c.mu.Unlock() + done := make(chan struct{}) + if args.Method != "request_task" { return errors.New("argument .Method not valid for this procedure") } @@ -74,7 +77,22 @@ func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error { return nil } } - c.mapWg.Wait() + go func() { + c.mapWg.Wait() + close(done) + }() + + select { + case <-done: + if DEBUG { + fmt.Println("Wait done. Continuing...") + } + case <-time.After(GET_TASK_TIMEOUT): + return nil + } + + done = make(chan struct{}) + // check for reduce jobs for i, rj := range c.ReduceJobs { if !rj.Status && !rj.Active { @@ -97,7 +115,19 @@ func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error { return nil } } - c.reduceWg.Wait() + go func() { + c.reduceWg.Wait() + close(done) + }() + + select { + case <-done: + if DEBUG { + fmt.Println("Wait done. Continuing...") + } + case <-time.After(GET_TASK_TIMEOUT): + return nil + } reply.TaskType = "done" reply.NReduce = c.NReduce @@ -171,10 +201,16 @@ func (c *Coordinator) CheckTaskValidity() { defer c.mu.Unlock() for i, mj := range c.MapJobs { timeNow := time.Now() + duration := timeNow.Sub(mj.LastPing) + if DEBUG { + fmt.Printf("[Worker Status] id: %s | time: %s | active: %t | status: %t\n", mj.WorkerId, duration.String(), mj.Active, mj.Status) + } if mj.Active && !mj.Status { - duration := timeNow.Sub(mj.LastPing) if duration >= JOB_TIMEOUT_S { // unset worker id and set the status + if DEBUG { + fmt.Printf("[Terminating Worker] %s", mj.WorkerId) + } c.MapJobs[i].WorkerId = "" c.MapJobs[i].Active = false c.MapJobs[i].Status = false @@ -186,6 +222,9 @@ func (c *Coordinator) CheckTaskValidity() { if rj.Active && !rj.Status { duration := timeNow.Sub(rj.LastPing) if duration >= JOB_TIMEOUT_S { + if DEBUG { + fmt.Printf("[Terminating Worker] %s", rj.WorkerId) + } c.ReduceJobs[i].WorkerId = "" c.ReduceJobs[i].Active = false c.ReduceJobs[i].Status = false @@ -198,6 +237,7 @@ func (c *Coordinator) CheckTaskValidity() { // if the entire job has finished. func (c *Coordinator) Done() bool { ret := false + go c.CheckTaskValidity() // Check any non-completed jobs for _, mj := range c.MapJobs { if !mj.Status { diff --git a/src/mr/worker.go b/src/mr/worker.go index fbfe208..21fb5f3 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -12,6 +12,7 @@ import ( "path/filepath" "sort" "strings" + "time" ) // Map functions return a slice of KeyValue. @@ -41,9 +42,26 @@ func ihash(key string) int { func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // Your worker implementation here. - taskType := "init" + var workerId, taskType string + var inFilenames []string + var nReduce int + + taskType = "init" for taskType != "done" { - workerId, taskType, inFilenames, nReduce := RequestTask() + //for taskType != "done" && taskType != "init" && taskType != "map" && taskType != "reduce" { + //workerId, taskType, inFilenames, nReduce = RequestTask() + //} + for { + workerId, taskType, inFilenames, nReduce = RequestTask() + + // Check if taskType is valid + if taskType == "done" || taskType == "init" || taskType == "map" || taskType == "reduce" { + break // Exit the loop if taskType is valid + } + time.Sleep(1 * time.Second) + //fmt.Println("Invalid taskType, retrying...") + } + if taskType == "done" { return }