lab1 passed all tests

This commit is contained in:
2024-12-29 19:24:37 +07:00
parent c1ad7ba1aa
commit 00bfc7b4ef
2 changed files with 65 additions and 7 deletions

View File

@@ -17,8 +17,9 @@ import (
) )
const ( const (
JOB_TIMEOUT_S = 10 * time.Second JOB_TIMEOUT_S = 10 * time.Second
DEBUG = false GET_TASK_TIMEOUT = 2 * time.Second
DEBUG = false
) )
type Coordinator struct { type Coordinator struct {
@@ -45,6 +46,8 @@ func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
done := make(chan struct{})
if args.Method != "request_task" { if args.Method != "request_task" {
return errors.New("argument .Method not valid for this procedure") return errors.New("argument .Method not valid for this procedure")
} }
@@ -74,7 +77,22 @@ func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error {
return nil return nil
} }
} }
c.mapWg.Wait() go func() {
c.mapWg.Wait()
close(done)
}()
select {
case <-done:
if DEBUG {
fmt.Println("Wait done. Continuing...")
}
case <-time.After(GET_TASK_TIMEOUT):
return nil
}
done = make(chan struct{})
// check for reduce jobs // check for reduce jobs
for i, rj := range c.ReduceJobs { for i, rj := range c.ReduceJobs {
if !rj.Status && !rj.Active { if !rj.Status && !rj.Active {
@@ -97,7 +115,19 @@ func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error {
return nil return nil
} }
} }
c.reduceWg.Wait() go func() {
c.reduceWg.Wait()
close(done)
}()
select {
case <-done:
if DEBUG {
fmt.Println("Wait done. Continuing...")
}
case <-time.After(GET_TASK_TIMEOUT):
return nil
}
reply.TaskType = "done" reply.TaskType = "done"
reply.NReduce = c.NReduce reply.NReduce = c.NReduce
@@ -171,10 +201,16 @@ func (c *Coordinator) CheckTaskValidity() {
defer c.mu.Unlock() defer c.mu.Unlock()
for i, mj := range c.MapJobs { for i, mj := range c.MapJobs {
timeNow := time.Now() timeNow := time.Now()
duration := timeNow.Sub(mj.LastPing)
if DEBUG {
fmt.Printf("[Worker Status] id: %s | time: %s | active: %t | status: %t\n", mj.WorkerId, duration.String(), mj.Active, mj.Status)
}
if mj.Active && !mj.Status { if mj.Active && !mj.Status {
duration := timeNow.Sub(mj.LastPing)
if duration >= JOB_TIMEOUT_S { if duration >= JOB_TIMEOUT_S {
// unset worker id and set the status // unset worker id and set the status
if DEBUG {
fmt.Printf("[Terminating Worker] %s", mj.WorkerId)
}
c.MapJobs[i].WorkerId = "" c.MapJobs[i].WorkerId = ""
c.MapJobs[i].Active = false c.MapJobs[i].Active = false
c.MapJobs[i].Status = false c.MapJobs[i].Status = false
@@ -186,6 +222,9 @@ func (c *Coordinator) CheckTaskValidity() {
if rj.Active && !rj.Status { if rj.Active && !rj.Status {
duration := timeNow.Sub(rj.LastPing) duration := timeNow.Sub(rj.LastPing)
if duration >= JOB_TIMEOUT_S { if duration >= JOB_TIMEOUT_S {
if DEBUG {
fmt.Printf("[Terminating Worker] %s", rj.WorkerId)
}
c.ReduceJobs[i].WorkerId = "" c.ReduceJobs[i].WorkerId = ""
c.ReduceJobs[i].Active = false c.ReduceJobs[i].Active = false
c.ReduceJobs[i].Status = false c.ReduceJobs[i].Status = false
@@ -198,6 +237,7 @@ func (c *Coordinator) CheckTaskValidity() {
// if the entire job has finished. // if the entire job has finished.
func (c *Coordinator) Done() bool { func (c *Coordinator) Done() bool {
ret := false ret := false
go c.CheckTaskValidity()
// Check any non-completed jobs // Check any non-completed jobs
for _, mj := range c.MapJobs { for _, mj := range c.MapJobs {
if !mj.Status { if !mj.Status {

View File

@@ -12,6 +12,7 @@ import (
"path/filepath" "path/filepath"
"sort" "sort"
"strings" "strings"
"time"
) )
// Map functions return a slice of KeyValue. // Map functions return a slice of KeyValue.
@@ -41,9 +42,26 @@ func ihash(key string) int {
func Worker(mapf func(string, string) []KeyValue, func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) { reducef func(string, []string) string) {
// Your worker implementation here. // Your worker implementation here.
taskType := "init" var workerId, taskType string
var inFilenames []string
var nReduce int
taskType = "init"
for taskType != "done" { for taskType != "done" {
workerId, taskType, inFilenames, nReduce := RequestTask() //for taskType != "done" && taskType != "init" && taskType != "map" && taskType != "reduce" {
//workerId, taskType, inFilenames, nReduce = RequestTask()
//}
for {
workerId, taskType, inFilenames, nReduce = RequestTask()
// Check if taskType is valid
if taskType == "done" || taskType == "init" || taskType == "map" || taskType == "reduce" {
break // Exit the loop if taskType is valid
}
time.Sleep(1 * time.Second)
//fmt.Println("Invalid taskType, retrying...")
}
if taskType == "done" { if taskType == "done" {
return return
} }