initial commit
This commit is contained in:
@@ -1,33 +1,98 @@
|
|||||||
package mr
|
package mr
|
||||||
|
|
||||||
import "log"
|
import (
|
||||||
import "net"
|
"errors"
|
||||||
import "os"
|
"log"
|
||||||
import "net/rpc"
|
"net"
|
||||||
import "net/http"
|
"net/http"
|
||||||
|
"net/rpc"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
type Coordinator struct {
|
type Coordinator struct {
|
||||||
// Your definitions here.
|
MapJobs []Job
|
||||||
|
ReduceJobs []Job
|
||||||
|
NReduce int
|
||||||
|
}
|
||||||
|
|
||||||
|
type Job struct {
|
||||||
|
Filename string
|
||||||
|
Status bool
|
||||||
|
Active bool
|
||||||
|
LastPing time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// Your code here -- RPC handlers for the worker to call.
|
// Your code here -- RPC handlers for the worker to call.
|
||||||
|
|
||||||
//
|
|
||||||
// an example RPC handler.
|
|
||||||
//
|
//
|
||||||
// the RPC argument and reply types are defined in rpc.go.
|
// the RPC argument and reply types are defined in rpc.go.
|
||||||
//
|
func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error {
|
||||||
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
|
if args.Method != "request_task" {
|
||||||
reply.Y = args.X + 1
|
return errors.New("argument .Method not valid for this procedure")
|
||||||
|
}
|
||||||
|
// check for any non-completed OR non-active map jobs
|
||||||
|
for _, mj := range c.MapJobs {
|
||||||
|
// assign job that not already done OR not actively being run by other workers
|
||||||
|
if !mj.Status && !mj.Active {
|
||||||
|
reply.NReduce = c.NReduce
|
||||||
|
reply.TaskType = "map"
|
||||||
|
reply.Filename = mj.Filename
|
||||||
|
// set job status, taken by a worker
|
||||||
|
mj.Active = true
|
||||||
|
mj.LastPing = time.Now()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// check for reduce jobs
|
||||||
|
for _, rj := range c.ReduceJobs {
|
||||||
|
if !rj.Status && !rj.Active {
|
||||||
|
reply.NReduce = c.NReduce
|
||||||
|
reply.TaskType = "reduce"
|
||||||
|
reply.Filename = rj.Filename
|
||||||
|
// set job status, taken by a worker
|
||||||
|
rj.Active = true
|
||||||
|
rj.LastPing = time.Now()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set task complete
|
||||||
|
func (c *Coordinator) SetTaskDone(args *RpcArgument, reply *RpcReply) error {
|
||||||
|
if args.Method != "set_task_done" {
|
||||||
|
return errors.New("argument .Method not valid for this procedure")
|
||||||
|
}
|
||||||
|
if args.TaskType == "map" {
|
||||||
|
f := args.Filename
|
||||||
|
for _, mj := range c.MapJobs {
|
||||||
|
if mj.Filename == f {
|
||||||
|
mj.Status = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Create reduce job data struct for each intermediary file returned from worker's map job
|
||||||
|
for _, ifn := range args.IntermediateFiles {
|
||||||
|
reduceJob := Job{}
|
||||||
|
reduceJob.Active = false
|
||||||
|
reduceJob.Filename = ifn
|
||||||
|
reduceJob.LastPing = time.Now()
|
||||||
|
reduceJob.Status = false
|
||||||
|
c.ReduceJobs = append(c.ReduceJobs, reduceJob)
|
||||||
|
}
|
||||||
|
} else if args.TaskType == "reduce" {
|
||||||
|
f := args.Filename
|
||||||
|
for _, rj := range c.ReduceJobs {
|
||||||
|
if rj.Filename == f {
|
||||||
|
rj.Status = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return errors.New("argument .TaskType not valid for this procedure")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// start a thread that listens for RPCs from worker.go
|
// start a thread that listens for RPCs from worker.go
|
||||||
//
|
|
||||||
func (c *Coordinator) server() {
|
func (c *Coordinator) server() {
|
||||||
rpc.Register(c)
|
rpc.Register(c)
|
||||||
rpc.HandleHTTP()
|
rpc.HandleHTTP()
|
||||||
@@ -41,29 +106,36 @@ func (c *Coordinator) server() {
|
|||||||
go http.Serve(l, nil)
|
go http.Serve(l, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// main/mrcoordinator.go calls Done() periodically to find out
|
// main/mrcoordinator.go calls Done() periodically to find out
|
||||||
// if the entire job has finished.
|
// if the entire job has finished.
|
||||||
//
|
|
||||||
func (c *Coordinator) Done() bool {
|
func (c *Coordinator) Done() bool {
|
||||||
ret := false
|
ret := false
|
||||||
|
// Check any non-completed jobs
|
||||||
// Your code here.
|
for _, mj := range c.MapJobs {
|
||||||
|
if !mj.Status {
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// all mj completed
|
||||||
|
ret = true
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// create a Coordinator.
|
// create a Coordinator.
|
||||||
// main/mrcoordinator.go calls this function.
|
// main/mrcoordinator.go calls this function.
|
||||||
// nReduce is the number of reduce tasks to use.
|
// nReduce is the number of reduce tasks to use.
|
||||||
//
|
|
||||||
func MakeCoordinator(files []string, nReduce int) *Coordinator {
|
func MakeCoordinator(files []string, nReduce int) *Coordinator {
|
||||||
c := Coordinator{}
|
c := Coordinator{}
|
||||||
|
|
||||||
// Your code here.
|
c.NReduce = nReduce
|
||||||
|
|
||||||
|
for i := range files {
|
||||||
|
mj := Job{}
|
||||||
|
mj.Filename = files[i]
|
||||||
|
mj.Status = false
|
||||||
|
mj.Active = false
|
||||||
|
c.MapJobs = append(c.MapJobs, mj)
|
||||||
|
}
|
||||||
|
|
||||||
c.server()
|
c.server()
|
||||||
return &c
|
return &c
|
||||||
|
|||||||
@@ -6,25 +6,31 @@ package mr
|
|||||||
// remember to capitalize all names.
|
// remember to capitalize all names.
|
||||||
//
|
//
|
||||||
|
|
||||||
import "os"
|
import (
|
||||||
import "strconv"
|
"os"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
//
|
//
|
||||||
// example to show how to declare the arguments
|
// example to show how to declare the arguments
|
||||||
// and reply for an RPC.
|
// and reply for an RPC.
|
||||||
//
|
//
|
||||||
|
|
||||||
type ExampleArgs struct {
|
type RpcArgument struct {
|
||||||
X int
|
Method string
|
||||||
|
Filename string
|
||||||
|
TaskType string
|
||||||
|
IntermediateFiles []string
|
||||||
}
|
}
|
||||||
|
|
||||||
type ExampleReply struct {
|
type RpcReply struct {
|
||||||
Y int
|
TaskType string
|
||||||
|
Filename string
|
||||||
|
NReduce int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add your RPC definitions here.
|
// Add your RPC definitions here.
|
||||||
|
|
||||||
|
|
||||||
// Cook up a unique-ish UNIX-domain socket name
|
// Cook up a unique-ish UNIX-domain socket name
|
||||||
// in /var/tmp, for the coordinator.
|
// in /var/tmp, for the coordinator.
|
||||||
// Can't use the current directory since
|
// Can't use the current directory since
|
||||||
|
|||||||
170
src/mr/worker.go
170
src/mr/worker.go
@@ -1,77 +1,175 @@
|
|||||||
package mr
|
package mr
|
||||||
|
|
||||||
import "fmt"
|
import (
|
||||||
import "log"
|
"bytes"
|
||||||
import "net/rpc"
|
"encoding/json"
|
||||||
import "hash/fnv"
|
"fmt"
|
||||||
|
"hash/fnv"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/rpc"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
//
|
|
||||||
// Map functions return a slice of KeyValue.
|
// Map functions return a slice of KeyValue.
|
||||||
//
|
|
||||||
type KeyValue struct {
|
type KeyValue struct {
|
||||||
Key string
|
Key string
|
||||||
Value string
|
Value string
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// use ihash(key) % NReduce to choose the reduce
|
// use ihash(key) % NReduce to choose the reduce
|
||||||
// task number for each KeyValue emitted by Map.
|
// task number for each KeyValue emitted by Map.
|
||||||
//
|
|
||||||
func ihash(key string) int {
|
func ihash(key string) int {
|
||||||
h := fnv.New32a()
|
h := fnv.New32a()
|
||||||
h.Write([]byte(key))
|
h.Write([]byte(key))
|
||||||
return int(h.Sum32() & 0x7fffffff)
|
return int(h.Sum32() & 0x7fffffff)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
//
|
|
||||||
// main/mrworker.go calls this function.
|
// main/mrworker.go calls this function.
|
||||||
//
|
|
||||||
func Worker(mapf func(string, string) []KeyValue,
|
func Worker(mapf func(string, string) []KeyValue,
|
||||||
reducef func(string, []string) string) {
|
reducef func(string, []string) string) {
|
||||||
|
|
||||||
// Your worker implementation here.
|
// Your worker implementation here.
|
||||||
|
taskType, inFilename, nReduce := RequestTask()
|
||||||
|
fmt.Printf("Perform %s task on: %s", taskType, inFilename)
|
||||||
|
file, err := os.Open(inFilename)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("cannot open %v", inFilename)
|
||||||
|
}
|
||||||
|
content, err := io.ReadAll(file)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("cannot read %v", inFilename)
|
||||||
|
}
|
||||||
|
file.Close()
|
||||||
|
if taskType == "map" {
|
||||||
|
intermediateFiles := []string{}
|
||||||
|
kva := mapf(inFilename, string(content))
|
||||||
|
//hash the map job id
|
||||||
|
mapTaskIdx := ihash(inFilename)
|
||||||
|
for i := range kva {
|
||||||
|
log.Printf("%s: %s", kva[i].Key, kva[i].Value)
|
||||||
|
idx := ihash(kva[i].Key) % nReduce
|
||||||
|
mapOutFn := fmt.Sprintf("mr-%d-%d", mapTaskIdx, idx)
|
||||||
|
// add to intermediate file list
|
||||||
|
intermediateFiles = append(intermediateFiles, mapOutFn)
|
||||||
|
// write intermidiate files
|
||||||
|
data, err := json.MarshalIndent(kva[i], "", " ")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("failed to serialize map to JSON: %w", err)
|
||||||
|
}
|
||||||
|
if err = WriteTempAtomic(mapOutFn, data); err != nil {
|
||||||
|
log.Fatalf("failed to write KV map to file: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SetTaskDone("map", inFilename, intermediateFiles)
|
||||||
|
return
|
||||||
|
|
||||||
// uncomment to send the Example RPC to the coordinator.
|
} else if taskType == "reduce" {
|
||||||
// CallExample()
|
intermediate := []KeyValue{}
|
||||||
|
|
||||||
|
// Prepare output file
|
||||||
|
oFilename := strings.Split(inFilename, ",")[1]
|
||||||
|
var buffer bytes.Buffer
|
||||||
|
i := 0
|
||||||
|
for i < len(intermediate) {
|
||||||
|
j := i + 1
|
||||||
|
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
values := []string{}
|
||||||
|
for k := i; k < j; k++ {
|
||||||
|
values = append(values, intermediate[k].Value)
|
||||||
|
}
|
||||||
|
output := reducef(intermediate[i].Key, values)
|
||||||
|
|
||||||
|
// this is the correct format for each line of Reduce output.
|
||||||
|
fmt.Fprintf(&buffer, "%v %v\n", intermediate[i].Key, output)
|
||||||
|
|
||||||
|
i = j
|
||||||
|
}
|
||||||
|
if err = WriteTempAtomic(oFilename, buffer.Bytes()); err != nil {
|
||||||
|
log.Fatalf("failed to write KV map to file: %w", err)
|
||||||
|
}
|
||||||
|
SetTaskDone("reduce", inFilename, []string{})
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
func WriteTempAtomic(fn string, data []byte) error {
|
||||||
// example function to show how to make an RPC call to the coordinator.
|
// Create a temporary file in the target directory
|
||||||
//
|
currentDir, err := os.Getwd()
|
||||||
// the RPC argument and reply types are defined in rpc.go.
|
tempFile, err := os.CreateTemp(currentDir, fn)
|
||||||
//
|
if err != nil {
|
||||||
func CallExample() {
|
return fmt.Errorf("failed to create temp file: %w", err)
|
||||||
|
}
|
||||||
|
tempPath := tempFile.Name()
|
||||||
|
|
||||||
// declare an argument structure.
|
if _, err := tempFile.Write(data); err != nil {
|
||||||
args := ExampleArgs{}
|
tempFile.Close()
|
||||||
|
os.Remove(tempPath)
|
||||||
|
return fmt.Errorf("failed to write to temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// fill in the argument(s).
|
// Ensure the file is fully written to disk
|
||||||
args.X = 99
|
if err := tempFile.Sync(); err != nil {
|
||||||
|
tempFile.Close()
|
||||||
|
os.Remove(tempPath)
|
||||||
|
return fmt.Errorf("failed to sync temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// declare a reply structure.
|
if err := tempFile.Close(); err != nil {
|
||||||
reply := ExampleReply{}
|
os.Remove(tempPath)
|
||||||
|
return fmt.Errorf("failed to close temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// send the RPC request, wait for the reply.
|
// Compute the target file path
|
||||||
// the "Coordinator.Example" tells the
|
targetPath := filepath.Join(currentDir, fn)
|
||||||
// receiving server that we'd like to call
|
|
||||||
// the Example() method of struct Coordinator.
|
// Atomically replace the target file with the temporary file
|
||||||
ok := call("Coordinator.Example", &args, &reply)
|
if err := os.Rename(tempPath, targetPath); err != nil {
|
||||||
|
os.Remove(tempPath)
|
||||||
|
return fmt.Errorf("failed to rename temp file to target: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RPC call to set the task status to done
|
||||||
|
func SetTaskDone(taskType string, filename string, intermediateFiles []string) {
|
||||||
|
args := RpcArgument{}
|
||||||
|
args.Method = "set_task_done"
|
||||||
|
args.TaskType = taskType
|
||||||
|
args.Filename = filename
|
||||||
|
args.IntermediateFiles = intermediateFiles
|
||||||
|
reply := RpcReply{}
|
||||||
|
ok := call("Coordinator.SetTaskDone", &args, &reply)
|
||||||
if ok {
|
if ok {
|
||||||
// reply.Y should be 100.
|
fmt.Printf("ok")
|
||||||
fmt.Printf("reply.Y %v\n", reply.Y)
|
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("call failed!\n")
|
fmt.Printf("call failed!\n")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
// RPC call to request task from coordinator
|
||||||
|
func RequestTask() (string, string, int) {
|
||||||
|
// declare an arg and reply
|
||||||
|
args := RpcArgument{}
|
||||||
|
args.Method = "request_task"
|
||||||
|
reply := RpcReply{}
|
||||||
|
|
||||||
|
ok := call("Coordinator.GetTask", &args, &reply)
|
||||||
|
if ok {
|
||||||
|
fmt.Printf("reply.TaskType %v\n", reply.TaskType)
|
||||||
|
fmt.Printf("reply.InFilename %v\n", reply.Filename)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("call failed!\n")
|
||||||
|
}
|
||||||
|
return reply.TaskType, reply.Filename, reply.NReduce
|
||||||
|
}
|
||||||
|
|
||||||
// send an RPC request to the coordinator, wait for the response.
|
// send an RPC request to the coordinator, wait for the response.
|
||||||
// usually returns true.
|
// usually returns true.
|
||||||
// returns false if something goes wrong.
|
// returns false if something goes wrong.
|
||||||
//
|
|
||||||
func call(rpcname string, args interface{}, reply interface{}) bool {
|
func call(rpcname string, args interface{}, reply interface{}) bool {
|
||||||
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
|
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
|
||||||
sockname := coordinatorSock()
|
sockname := coordinatorSock()
|
||||||
|
|||||||
Reference in New Issue
Block a user