From 549f538bbf8a4826868238b949e54594e4f14e7c Mon Sep 17 00:00:00 2001 From: bapung Date: Sun, 15 Dec 2024 12:08:02 +0700 Subject: [PATCH] wc pass --- src/main/test-mr.sh | 1 + src/mr/coordinator.go | 156 +++++++++++++++++++++++++++------- src/mr/rpc.go | 9 +- src/mr/worker.go | 190 +++++++++++++++++++++++++++--------------- 4 files changed, 254 insertions(+), 102 deletions(-) diff --git a/src/main/test-mr.sh b/src/main/test-mr.sh index 210019f..898c8e2 100644 --- a/src/main/test-mr.sh +++ b/src/main/test-mr.sh @@ -113,6 +113,7 @@ fi # wait for remaining workers and coordinator to exit. wait +exit 0 ######################################################### # now indexer rm -f mr-* diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index cd7e9cf..b7a8343 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -1,60 +1,105 @@ package mr import ( + "crypto/rand" "errors" + "fmt" "log" + "math/big" "net" "net/http" "net/rpc" "os" + "strconv" + "strings" + "sync" "time" ) +const ( + JOB_TIMEOUT_S = 10 * time.Second + DEBUG = true +) + type Coordinator struct { MapJobs []Job ReduceJobs []Job NReduce int + mu sync.Mutex + mapWg sync.WaitGroup } type Job struct { - Filename string - Status bool - Active bool - LastPing time.Time + WorkerId string + Filenames []string + Status bool + Active bool + LastPing time.Time } // Your code here -- RPC handlers for the worker to call. // // the RPC argument and reply types are defined in rpc.go. func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error { + c.mu.Lock() + defer c.mu.Unlock() + if args.Method != "request_task" { return errors.New("argument .Method not valid for this procedure") } // check for any non-completed OR non-active map jobs - for _, mj := range c.MapJobs { + for i, mj := range c.MapJobs { // assign job that not already done OR not actively being run by other workers + // status = false AND active = false if !mj.Status && !mj.Active { + wId, err := genWorkerId() + if err != nil { + fmt.Println(err) + } reply.NReduce = c.NReduce reply.TaskType = "map" - reply.Filename = mj.Filename - // set job status, taken by a worker - mj.Active = true - mj.LastPing = time.Now() + reply.Filenames = mj.Filenames + reply.WorkerId = wId + // set job status, taken by a worke + c.MapJobs[i].WorkerId = wId + c.MapJobs[i].Active = true + c.MapJobs[i].LastPing = time.Now() + if DEBUG { + fmt.Printf("[Map Job Assigned] %s | file: %s | status: %t | active: %t |\n", + c.MapJobs[i].WorkerId, + c.MapJobs[i].Filenames[0], + c.MapJobs[i].Status, c.MapJobs[i].Active) + } return nil } } + c.mapWg.Wait() // check for reduce jobs - for _, rj := range c.ReduceJobs { + for i, rj := range c.ReduceJobs { if !rj.Status && !rj.Active { + wId, err := genWorkerId() + if err != nil { + fmt.Println(err) + } + reply.WorkerId = wId reply.NReduce = c.NReduce reply.TaskType = "reduce" - reply.Filename = rj.Filename + reply.Filenames = rj.Filenames // set job status, taken by a worker - rj.Active = true - rj.LastPing = time.Now() + c.ReduceJobs[i].WorkerId = wId + c.ReduceJobs[i].Active = true + c.ReduceJobs[i].LastPing = time.Now() + if DEBUG { + fmt.Printf("[Reduce Job Assigned] %s | status: %t | active: %t |\n", + c.ReduceJobs[i].WorkerId, c.ReduceJobs[i].Status, c.ReduceJobs[i].Active) + } return nil } } + + reply.TaskType = "done" + reply.NReduce = c.NReduce + reply.Filenames = []string{} return nil } @@ -64,26 +109,35 @@ func (c *Coordinator) SetTaskDone(args *RpcArgument, reply *RpcReply) error { return errors.New("argument .Method not valid for this procedure") } if args.TaskType == "map" { - f := args.Filename - for _, mj := range c.MapJobs { - if mj.Filename == f { - mj.Status = true + wId := args.WorkerId + for i, _ := range c.MapJobs { + if c.MapJobs[i].WorkerId == wId { + c.MapJobs[i].Status = true + c.MapJobs[i].Active = false + c.mapWg.Done() + if DEBUG { + fmt.Printf("[DONE] %s | file: %s | status: %t | active: %t\n", + c.MapJobs[i].WorkerId, + c.MapJobs[i].Filenames[0], + c.MapJobs[i].Status, c.MapJobs[i].Active) + } } } - // Create reduce job data struct for each intermediary file returned from worker's map job + // Check again for _, ifn := range args.IntermediateFiles { - reduceJob := Job{} - reduceJob.Active = false - reduceJob.Filename = ifn - reduceJob.LastPing = time.Now() - reduceJob.Status = false - c.ReduceJobs = append(c.ReduceJobs, reduceJob) + reduceJobIdx := strings.Split(ifn, "-")[2] + idx, err := strconv.Atoi(reduceJobIdx) + if err != nil { + fmt.Printf("Error converting string to integer: %v\n", err) + } + c.ReduceJobs[idx].Filenames = append(c.ReduceJobs[idx].Filenames, ifn) } + } else if args.TaskType == "reduce" { - f := args.Filename - for _, rj := range c.ReduceJobs { - if rj.Filename == f { - rj.Status = true + wId := args.WorkerId + for i, _ := range c.ReduceJobs { + if c.ReduceJobs[i].WorkerId == wId { + c.ReduceJobs[i].Status = true } } } else { @@ -106,6 +160,23 @@ func (c *Coordinator) server() { go http.Serve(l, nil) } +// check scheduled tasks; if their age > JOB_TIMEOUT_S +// reset the job status, making it available for scheduling +func (c *Coordinator) CheckTaskValidity() { + c.mu.Lock() + defer c.mu.Unlock() + for i, rj := range c.ReduceJobs { + timeNow := time.Now() + if rj.Active && !rj.Status { + duration := timeNow.Sub(rj.LastPing) + if duration >= JOB_TIMEOUT_S { + c.ReduceJobs[i].Active = false + c.ReduceJobs[i].Status = false + } + } + } +} + // main/mrcoordinator.go calls Done() periodically to find out // if the entire job has finished. func (c *Coordinator) Done() bool { @@ -116,6 +187,11 @@ func (c *Coordinator) Done() bool { return ret } } + for _, mj := range c.ReduceJobs { + if !mj.Status { + return ret + } + } // all mj completed ret = true return ret @@ -131,12 +207,34 @@ func MakeCoordinator(files []string, nReduce int) *Coordinator { for i := range files { mj := Job{} - mj.Filename = files[i] + mj.Filenames = []string{files[i]} mj.Status = false mj.Active = false + c.mapWg.Add(1) c.MapJobs = append(c.MapJobs, mj) } + for j := 0; j < nReduce; j++ { + reduceJob := Job{} + reduceJob.Active = false + reduceJob.Filenames = []string{} + reduceJob.Status = false + c.ReduceJobs = append(c.ReduceJobs, reduceJob) + } + c.server() return &c } + +func genWorkerId() (string, error) { + const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + result := make([]byte, 8) + for i := range result { + index, err := rand.Int(rand.Reader, big.NewInt(int64(len(charset)))) + if err != nil { + return "", fmt.Errorf("error generating random number: %v", err) + } + result[i] = charset[index.Int64()] + } + return string(result), nil +} diff --git a/src/mr/rpc.go b/src/mr/rpc.go index f7787d8..7eb2071 100644 --- a/src/mr/rpc.go +++ b/src/mr/rpc.go @@ -17,16 +17,17 @@ import ( // type RpcArgument struct { + WorkerId string Method string - Filename string TaskType string IntermediateFiles []string } type RpcReply struct { - TaskType string - Filename string - NReduce int + WorkerId string + TaskType string + Filenames []string + NReduce int } // Add your RPC definitions here. diff --git a/src/mr/worker.go b/src/mr/worker.go index 7c6ea64..fbfe208 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -10,6 +10,7 @@ import ( "net/rpc" "os" "path/filepath" + "sort" "strings" ) @@ -19,6 +20,15 @@ type KeyValue struct { Value string } +// Sorting +// for sorting by key. +type ByKey []KeyValue + +// for sorting by key. +func (a ByKey) Len() int { return len(a) } +func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key } + // use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. func ihash(key string) int { @@ -31,69 +41,109 @@ func ihash(key string) int { func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // Your worker implementation here. - taskType, inFilename, nReduce := RequestTask() - fmt.Printf("Perform %s task on: %s", taskType, inFilename) - file, err := os.Open(inFilename) - if err != nil { - log.Fatalf("cannot open %v", inFilename) - } - content, err := io.ReadAll(file) - if err != nil { - log.Fatalf("cannot read %v", inFilename) - } - file.Close() - if taskType == "map" { - intermediateFiles := []string{} - kva := mapf(inFilename, string(content)) - //hash the map job id - mapTaskIdx := ihash(inFilename) - for i := range kva { - log.Printf("%s: %s", kva[i].Key, kva[i].Value) - idx := ihash(kva[i].Key) % nReduce - mapOutFn := fmt.Sprintf("mr-%d-%d", mapTaskIdx, idx) - // add to intermediate file list - intermediateFiles = append(intermediateFiles, mapOutFn) - // write intermidiate files - data, err := json.MarshalIndent(kva[i], "", " ") + taskType := "init" + for taskType != "done" { + workerId, taskType, inFilenames, nReduce := RequestTask() + if taskType == "done" { + return + } + if taskType == "map" { + ifn := inFilenames[0] + file, err := os.Open(ifn) if err != nil { - log.Fatalf("failed to serialize map to JSON: %w", err) + log.Fatalf("cannot open %v", ifn) } - if err = WriteTempAtomic(mapOutFn, data); err != nil { + content, err := io.ReadAll(file) + if err != nil { + log.Fatalf("cannot read %v", ifn) + } + file.Close() + kva := mapf(ifn, string(content)) + //hash the map job id + mapTaskIdx := ihash(ifn) + + IntermediaryFileMaps := make(map[int][]KeyValue) + + for i := 0; i < nReduce; i++ { + IntermediaryFileMaps[i] = []KeyValue{} + } + + for i := range kva { + //log.Printf("%s: %s", kva[i].Key, kva[i].Value) + idx := ihash(kva[i].Key) % nReduce + IntermediaryFileMaps[idx] = append(IntermediaryFileMaps[idx], kva[i]) + } + + intermediateFiles := []string{} + for idx := range IntermediaryFileMaps { + outFile := fmt.Sprintf("mr-%d-%d", mapTaskIdx, idx) + intermediateFiles = append(intermediateFiles, outFile) + jsonData, err := json.Marshal(IntermediaryFileMaps[idx]) + if err != nil { + log.Fatal("failed to marshal json") + } + if err = WriteTempAtomic(outFile, jsonData); err != nil { + log.Fatalf("failed to write KV map to file") + } + } + SetTaskDone(workerId, "map", intermediateFiles) + + } else if taskType == "reduce" { + var intermediate []KeyValue + + for _, f := range inFilenames { + // Open the file + file, err := os.Open(f) + if err != nil { + log.Fatalf("Failed to open file: %v", err) + } + defer file.Close() + + // Read the file contents + data, err := io.ReadAll(file) + if err != nil { + log.Fatalf("Failed to read file: %v", err) + } + + // Parse the JSON data + var intermidateShard []KeyValue + if err := json.Unmarshal(data, &intermidateShard); err != nil { + log.Fatalf("Failed to parse JSON: %v", err) + } + intermediate = append(intermediate, intermidateShard...) + } + + sort.Sort(ByKey(intermediate)) + + // Prepare output file + reduceJobNum := strings.Split(inFilenames[0], "-")[2] + oFilename := fmt.Sprintf("mr-out-%s", reduceJobNum) + + var buffer bytes.Buffer + i := 0 + for i < len(intermediate) { + j := i + 1 + for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { + j++ + } + values := []string{} + for k := i; k < j; k++ { + values = append(values, intermediate[k].Value) + } + output := reducef(intermediate[i].Key, values) + + // this is the correct format for each line of Reduce output. + fmt.Fprintf(&buffer, "%v %v\n", intermediate[i].Key, output) + //fmt.Println(buffer.String()) + i = j + } + if err := WriteTempAtomic(oFilename, buffer.Bytes()); err != nil { log.Fatalf("failed to write KV map to file: %w", err) } + SetTaskDone(workerId, "reduce", []string{}) } - SetTaskDone("map", inFilename, intermediateFiles) - return - - } else if taskType == "reduce" { - intermediate := []KeyValue{} - - // Prepare output file - oFilename := strings.Split(inFilename, ",")[1] - var buffer bytes.Buffer - i := 0 - for i < len(intermediate) { - j := i + 1 - for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { - j++ - } - values := []string{} - for k := i; k < j; k++ { - values = append(values, intermediate[k].Value) - } - output := reducef(intermediate[i].Key, values) - - // this is the correct format for each line of Reduce output. - fmt.Fprintf(&buffer, "%v %v\n", intermediate[i].Key, output) - - i = j - } - if err = WriteTempAtomic(oFilename, buffer.Bytes()); err != nil { - log.Fatalf("failed to write KV map to file: %w", err) - } - SetTaskDone("reduce", inFilename, []string{}) - return } + } func WriteTempAtomic(fn string, data []byte) error { @@ -135,36 +185,38 @@ func WriteTempAtomic(fn string, data []byte) error { } // RPC call to set the task status to done -func SetTaskDone(taskType string, filename string, intermediateFiles []string) { +func SetTaskDone(workerId string, taskType string, intermediateFiles []string) { args := RpcArgument{} + args.WorkerId = workerId args.Method = "set_task_done" args.TaskType = taskType - args.Filename = filename args.IntermediateFiles = intermediateFiles reply := RpcReply{} ok := call("Coordinator.SetTaskDone", &args, &reply) - if ok { - fmt.Printf("ok") - } else { + if !ok { fmt.Printf("call failed!\n") } + /* + if ok { + fmt.Printf("ok\n") + } else { + fmt.Printf("call failed!\n") + } + */ } // RPC call to request task from coordinator -func RequestTask() (string, string, int) { +func RequestTask() (string, string, []string, int) { // declare an arg and reply args := RpcArgument{} args.Method = "request_task" reply := RpcReply{} - ok := call("Coordinator.GetTask", &args, &reply) - if ok { - fmt.Printf("reply.TaskType %v\n", reply.TaskType) - fmt.Printf("reply.InFilename %v\n", reply.Filename) - } else { + ok := call("Coordinator.GetTasks", &args, &reply) + if !ok { fmt.Printf("call failed!\n") } - return reply.TaskType, reply.Filename, reply.NReduce + return reply.WorkerId, reply.TaskType, reply.Filenames, reply.NReduce } // send an RPC request to the coordinator, wait for the response.