This commit is contained in:
2024-12-21 09:31:39 +07:00
parent 549f538bbf
commit 005df49fbe

View File

@@ -18,7 +18,7 @@ import (
const ( const (
JOB_TIMEOUT_S = 10 * time.Second JOB_TIMEOUT_S = 10 * time.Second
DEBUG = true DEBUG = false
) )
type Coordinator struct { type Coordinator struct {
@@ -27,6 +27,7 @@ type Coordinator struct {
NReduce int NReduce int
mu sync.Mutex mu sync.Mutex
mapWg sync.WaitGroup mapWg sync.WaitGroup
reduceWg sync.WaitGroup
} }
type Job struct { type Job struct {
@@ -96,6 +97,7 @@ func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error {
return nil return nil
} }
} }
c.reduceWg.Wait()
reply.TaskType = "done" reply.TaskType = "done"
reply.NReduce = c.NReduce reply.NReduce = c.NReduce
@@ -138,6 +140,8 @@ func (c *Coordinator) SetTaskDone(args *RpcArgument, reply *RpcReply) error {
for i, _ := range c.ReduceJobs { for i, _ := range c.ReduceJobs {
if c.ReduceJobs[i].WorkerId == wId { if c.ReduceJobs[i].WorkerId == wId {
c.ReduceJobs[i].Status = true c.ReduceJobs[i].Status = true
c.ReduceJobs[i].Active = false
c.reduceWg.Done()
} }
} }
} else { } else {
@@ -165,11 +169,24 @@ func (c *Coordinator) server() {
func (c *Coordinator) CheckTaskValidity() { func (c *Coordinator) CheckTaskValidity() {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
for i, mj := range c.MapJobs {
timeNow := time.Now()
if mj.Active && !mj.Status {
duration := timeNow.Sub(mj.LastPing)
if duration >= JOB_TIMEOUT_S {
// unset worker id and set the status
c.MapJobs[i].WorkerId = ""
c.MapJobs[i].Active = false
c.MapJobs[i].Status = false
}
}
}
for i, rj := range c.ReduceJobs { for i, rj := range c.ReduceJobs {
timeNow := time.Now() timeNow := time.Now()
if rj.Active && !rj.Status { if rj.Active && !rj.Status {
duration := timeNow.Sub(rj.LastPing) duration := timeNow.Sub(rj.LastPing)
if duration >= JOB_TIMEOUT_S { if duration >= JOB_TIMEOUT_S {
c.ReduceJobs[i].WorkerId = ""
c.ReduceJobs[i].Active = false c.ReduceJobs[i].Active = false
c.ReduceJobs[i].Status = false c.ReduceJobs[i].Status = false
} }
@@ -219,6 +236,7 @@ func MakeCoordinator(files []string, nReduce int) *Coordinator {
reduceJob.Active = false reduceJob.Active = false
reduceJob.Filenames = []string{} reduceJob.Filenames = []string{}
reduceJob.Status = false reduceJob.Status = false
c.reduceWg.Add(1)
c.ReduceJobs = append(c.ReduceJobs, reduceJob) c.ReduceJobs = append(c.ReduceJobs, reduceJob)
} }