diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index cafda57..cd7e9cf 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -1,33 +1,98 @@ package mr -import "log" -import "net" -import "os" -import "net/rpc" -import "net/http" - +import ( + "errors" + "log" + "net" + "net/http" + "net/rpc" + "os" + "time" +) type Coordinator struct { - // Your definitions here. + MapJobs []Job + ReduceJobs []Job + NReduce int +} +type Job struct { + Filename string + Status bool + Active bool + LastPing time.Time } // Your code here -- RPC handlers for the worker to call. - -// -// an example RPC handler. // // the RPC argument and reply types are defined in rpc.go. -// -func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error { - reply.Y = args.X + 1 +func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error { + if args.Method != "request_task" { + return errors.New("argument .Method not valid for this procedure") + } + // check for any non-completed OR non-active map jobs + for _, mj := range c.MapJobs { + // assign job that not already done OR not actively being run by other workers + if !mj.Status && !mj.Active { + reply.NReduce = c.NReduce + reply.TaskType = "map" + reply.Filename = mj.Filename + // set job status, taken by a worker + mj.Active = true + mj.LastPing = time.Now() + return nil + } + } + // check for reduce jobs + for _, rj := range c.ReduceJobs { + if !rj.Status && !rj.Active { + reply.NReduce = c.NReduce + reply.TaskType = "reduce" + reply.Filename = rj.Filename + // set job status, taken by a worker + rj.Active = true + rj.LastPing = time.Now() + return nil + } + } return nil } +// Set task complete +func (c *Coordinator) SetTaskDone(args *RpcArgument, reply *RpcReply) error { + if args.Method != "set_task_done" { + return errors.New("argument .Method not valid for this procedure") + } + if args.TaskType == "map" { + f := args.Filename + for _, mj := range c.MapJobs { + if mj.Filename == f { + mj.Status = true + } + } + // Create reduce job data struct for each intermediary file returned from worker's map job + for _, ifn := range args.IntermediateFiles { + reduceJob := Job{} + reduceJob.Active = false + reduceJob.Filename = ifn + reduceJob.LastPing = time.Now() + reduceJob.Status = false + c.ReduceJobs = append(c.ReduceJobs, reduceJob) + } + } else if args.TaskType == "reduce" { + f := args.Filename + for _, rj := range c.ReduceJobs { + if rj.Filename == f { + rj.Status = true + } + } + } else { + return errors.New("argument .TaskType not valid for this procedure") + } + return nil +} -// // start a thread that listens for RPCs from worker.go -// func (c *Coordinator) server() { rpc.Register(c) rpc.HandleHTTP() @@ -41,29 +106,36 @@ func (c *Coordinator) server() { go http.Serve(l, nil) } -// // main/mrcoordinator.go calls Done() periodically to find out // if the entire job has finished. -// func (c *Coordinator) Done() bool { ret := false - - // Your code here. - - + // Check any non-completed jobs + for _, mj := range c.MapJobs { + if !mj.Status { + return ret + } + } + // all mj completed + ret = true return ret } -// // create a Coordinator. // main/mrcoordinator.go calls this function. // nReduce is the number of reduce tasks to use. -// func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{} - // Your code here. + c.NReduce = nReduce + for i := range files { + mj := Job{} + mj.Filename = files[i] + mj.Status = false + mj.Active = false + c.MapJobs = append(c.MapJobs, mj) + } c.server() return &c diff --git a/src/mr/rpc.go b/src/mr/rpc.go index 1f15466..f7787d8 100644 --- a/src/mr/rpc.go +++ b/src/mr/rpc.go @@ -6,25 +6,31 @@ package mr // remember to capitalize all names. // -import "os" -import "strconv" +import ( + "os" + "strconv" +) // // example to show how to declare the arguments // and reply for an RPC. // -type ExampleArgs struct { - X int +type RpcArgument struct { + Method string + Filename string + TaskType string + IntermediateFiles []string } -type ExampleReply struct { - Y int +type RpcReply struct { + TaskType string + Filename string + NReduce int } // Add your RPC definitions here. - // Cook up a unique-ish UNIX-domain socket name // in /var/tmp, for the coordinator. // Can't use the current directory since diff --git a/src/mr/worker.go b/src/mr/worker.go index aaa8b64..7c6ea64 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -1,77 +1,175 @@ package mr -import "fmt" -import "log" -import "net/rpc" -import "hash/fnv" +import ( + "bytes" + "encoding/json" + "fmt" + "hash/fnv" + "io" + "log" + "net/rpc" + "os" + "path/filepath" + "strings" +) - -// // Map functions return a slice of KeyValue. -// type KeyValue struct { Key string Value string } -// // use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. -// func ihash(key string) int { h := fnv.New32a() h.Write([]byte(key)) return int(h.Sum32() & 0x7fffffff) } - -// // main/mrworker.go calls this function. -// func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { - // Your worker implementation here. + taskType, inFilename, nReduce := RequestTask() + fmt.Printf("Perform %s task on: %s", taskType, inFilename) + file, err := os.Open(inFilename) + if err != nil { + log.Fatalf("cannot open %v", inFilename) + } + content, err := io.ReadAll(file) + if err != nil { + log.Fatalf("cannot read %v", inFilename) + } + file.Close() + if taskType == "map" { + intermediateFiles := []string{} + kva := mapf(inFilename, string(content)) + //hash the map job id + mapTaskIdx := ihash(inFilename) + for i := range kva { + log.Printf("%s: %s", kva[i].Key, kva[i].Value) + idx := ihash(kva[i].Key) % nReduce + mapOutFn := fmt.Sprintf("mr-%d-%d", mapTaskIdx, idx) + // add to intermediate file list + intermediateFiles = append(intermediateFiles, mapOutFn) + // write intermidiate files + data, err := json.MarshalIndent(kva[i], "", " ") + if err != nil { + log.Fatalf("failed to serialize map to JSON: %w", err) + } + if err = WriteTempAtomic(mapOutFn, data); err != nil { + log.Fatalf("failed to write KV map to file: %w", err) + } + } + SetTaskDone("map", inFilename, intermediateFiles) + return - // uncomment to send the Example RPC to the coordinator. - // CallExample() + } else if taskType == "reduce" { + intermediate := []KeyValue{} + // Prepare output file + oFilename := strings.Split(inFilename, ",")[1] + var buffer bytes.Buffer + i := 0 + for i < len(intermediate) { + j := i + 1 + for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { + j++ + } + values := []string{} + for k := i; k < j; k++ { + values = append(values, intermediate[k].Value) + } + output := reducef(intermediate[i].Key, values) + + // this is the correct format for each line of Reduce output. + fmt.Fprintf(&buffer, "%v %v\n", intermediate[i].Key, output) + + i = j + } + if err = WriteTempAtomic(oFilename, buffer.Bytes()); err != nil { + log.Fatalf("failed to write KV map to file: %w", err) + } + SetTaskDone("reduce", inFilename, []string{}) + return + } } -// -// example function to show how to make an RPC call to the coordinator. -// -// the RPC argument and reply types are defined in rpc.go. -// -func CallExample() { +func WriteTempAtomic(fn string, data []byte) error { + // Create a temporary file in the target directory + currentDir, err := os.Getwd() + tempFile, err := os.CreateTemp(currentDir, fn) + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + tempPath := tempFile.Name() - // declare an argument structure. - args := ExampleArgs{} + if _, err := tempFile.Write(data); err != nil { + tempFile.Close() + os.Remove(tempPath) + return fmt.Errorf("failed to write to temp file: %w", err) + } - // fill in the argument(s). - args.X = 99 + // Ensure the file is fully written to disk + if err := tempFile.Sync(); err != nil { + tempFile.Close() + os.Remove(tempPath) + return fmt.Errorf("failed to sync temp file: %w", err) + } - // declare a reply structure. - reply := ExampleReply{} + if err := tempFile.Close(); err != nil { + os.Remove(tempPath) + return fmt.Errorf("failed to close temp file: %w", err) + } - // send the RPC request, wait for the reply. - // the "Coordinator.Example" tells the - // receiving server that we'd like to call - // the Example() method of struct Coordinator. - ok := call("Coordinator.Example", &args, &reply) + // Compute the target file path + targetPath := filepath.Join(currentDir, fn) + + // Atomically replace the target file with the temporary file + if err := os.Rename(tempPath, targetPath); err != nil { + os.Remove(tempPath) + return fmt.Errorf("failed to rename temp file to target: %w", err) + } + return nil +} + +// RPC call to set the task status to done +func SetTaskDone(taskType string, filename string, intermediateFiles []string) { + args := RpcArgument{} + args.Method = "set_task_done" + args.TaskType = taskType + args.Filename = filename + args.IntermediateFiles = intermediateFiles + reply := RpcReply{} + ok := call("Coordinator.SetTaskDone", &args, &reply) if ok { - // reply.Y should be 100. - fmt.Printf("reply.Y %v\n", reply.Y) + fmt.Printf("ok") } else { fmt.Printf("call failed!\n") } } -// +// RPC call to request task from coordinator +func RequestTask() (string, string, int) { + // declare an arg and reply + args := RpcArgument{} + args.Method = "request_task" + reply := RpcReply{} + + ok := call("Coordinator.GetTask", &args, &reply) + if ok { + fmt.Printf("reply.TaskType %v\n", reply.TaskType) + fmt.Printf("reply.InFilename %v\n", reply.Filename) + } else { + fmt.Printf("call failed!\n") + } + return reply.TaskType, reply.Filename, reply.NReduce +} + // send an RPC request to the coordinator, wait for the response. // usually returns true. // returns false if something goes wrong. -// func call(rpcname string, args interface{}, reply interface{}) bool { // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234") sockname := coordinatorSock()