From 005df49fbea070e539739306c1c14a7a272d5857 Mon Sep 17 00:00:00 2001 From: bapung Date: Sat, 21 Dec 2024 09:31:39 +0700 Subject: [PATCH] cp2 --- src/mr/coordinator.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index b7a8343..bef7339 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -18,7 +18,7 @@ import ( const ( JOB_TIMEOUT_S = 10 * time.Second - DEBUG = true + DEBUG = false ) type Coordinator struct { @@ -27,6 +27,7 @@ type Coordinator struct { NReduce int mu sync.Mutex mapWg sync.WaitGroup + reduceWg sync.WaitGroup } type Job struct { @@ -96,6 +97,7 @@ func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error { return nil } } + c.reduceWg.Wait() reply.TaskType = "done" reply.NReduce = c.NReduce @@ -138,6 +140,8 @@ func (c *Coordinator) SetTaskDone(args *RpcArgument, reply *RpcReply) error { for i, _ := range c.ReduceJobs { if c.ReduceJobs[i].WorkerId == wId { c.ReduceJobs[i].Status = true + c.ReduceJobs[i].Active = false + c.reduceWg.Done() } } } else { @@ -165,11 +169,24 @@ func (c *Coordinator) server() { func (c *Coordinator) CheckTaskValidity() { c.mu.Lock() defer c.mu.Unlock() + for i, mj := range c.MapJobs { + timeNow := time.Now() + if mj.Active && !mj.Status { + duration := timeNow.Sub(mj.LastPing) + if duration >= JOB_TIMEOUT_S { + // unset worker id and set the status + c.MapJobs[i].WorkerId = "" + c.MapJobs[i].Active = false + c.MapJobs[i].Status = false + } + } + } for i, rj := range c.ReduceJobs { timeNow := time.Now() if rj.Active && !rj.Status { duration := timeNow.Sub(rj.LastPing) if duration >= JOB_TIMEOUT_S { + c.ReduceJobs[i].WorkerId = "" c.ReduceJobs[i].Active = false c.ReduceJobs[i].Status = false } @@ -219,6 +236,7 @@ func MakeCoordinator(files []string, nReduce int) *Coordinator { reduceJob.Active = false reduceJob.Filenames = []string{} reduceJob.Status = false + c.reduceWg.Add(1) c.ReduceJobs = append(c.ReduceJobs, reduceJob) }