Files
mit-6.5840/src/mr/worker.go
2024-12-15 12:08:02 +07:00

242 lines
6.0 KiB
Go

package mr
import (
"bytes"
"encoding/json"
"fmt"
"hash/fnv"
"io"
"log"
"net/rpc"
"os"
"path/filepath"
"sort"
"strings"
)
// Map functions return a slice of KeyValue.
type KeyValue struct {
Key string
Value string
}
// Sorting
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
taskType := "init"
for taskType != "done" {
workerId, taskType, inFilenames, nReduce := RequestTask()
if taskType == "done" {
return
}
if taskType == "map" {
ifn := inFilenames[0]
file, err := os.Open(ifn)
if err != nil {
log.Fatalf("cannot open %v", ifn)
}
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", ifn)
}
file.Close()
kva := mapf(ifn, string(content))
//hash the map job id
mapTaskIdx := ihash(ifn)
IntermediaryFileMaps := make(map[int][]KeyValue)
for i := 0; i < nReduce; i++ {
IntermediaryFileMaps[i] = []KeyValue{}
}
for i := range kva {
//log.Printf("%s: %s", kva[i].Key, kva[i].Value)
idx := ihash(kva[i].Key) % nReduce
IntermediaryFileMaps[idx] = append(IntermediaryFileMaps[idx], kva[i])
}
intermediateFiles := []string{}
for idx := range IntermediaryFileMaps {
outFile := fmt.Sprintf("mr-%d-%d", mapTaskIdx, idx)
intermediateFiles = append(intermediateFiles, outFile)
jsonData, err := json.Marshal(IntermediaryFileMaps[idx])
if err != nil {
log.Fatal("failed to marshal json")
}
if err = WriteTempAtomic(outFile, jsonData); err != nil {
log.Fatalf("failed to write KV map to file")
}
}
SetTaskDone(workerId, "map", intermediateFiles)
} else if taskType == "reduce" {
var intermediate []KeyValue
for _, f := range inFilenames {
// Open the file
file, err := os.Open(f)
if err != nil {
log.Fatalf("Failed to open file: %v", err)
}
defer file.Close()
// Read the file contents
data, err := io.ReadAll(file)
if err != nil {
log.Fatalf("Failed to read file: %v", err)
}
// Parse the JSON data
var intermidateShard []KeyValue
if err := json.Unmarshal(data, &intermidateShard); err != nil {
log.Fatalf("Failed to parse JSON: %v", err)
}
intermediate = append(intermediate, intermidateShard...)
}
sort.Sort(ByKey(intermediate))
// Prepare output file
reduceJobNum := strings.Split(inFilenames[0], "-")[2]
oFilename := fmt.Sprintf("mr-out-%s", reduceJobNum)
var buffer bytes.Buffer
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(&buffer, "%v %v\n", intermediate[i].Key, output)
//fmt.Println(buffer.String())
i = j
}
if err := WriteTempAtomic(oFilename, buffer.Bytes()); err != nil {
log.Fatalf("failed to write KV map to file: %w", err)
}
SetTaskDone(workerId, "reduce", []string{})
}
}
}
func WriteTempAtomic(fn string, data []byte) error {
// Create a temporary file in the target directory
currentDir, err := os.Getwd()
tempFile, err := os.CreateTemp(currentDir, fn)
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
tempPath := tempFile.Name()
if _, err := tempFile.Write(data); err != nil {
tempFile.Close()
os.Remove(tempPath)
return fmt.Errorf("failed to write to temp file: %w", err)
}
// Ensure the file is fully written to disk
if err := tempFile.Sync(); err != nil {
tempFile.Close()
os.Remove(tempPath)
return fmt.Errorf("failed to sync temp file: %w", err)
}
if err := tempFile.Close(); err != nil {
os.Remove(tempPath)
return fmt.Errorf("failed to close temp file: %w", err)
}
// Compute the target file path
targetPath := filepath.Join(currentDir, fn)
// Atomically replace the target file with the temporary file
if err := os.Rename(tempPath, targetPath); err != nil {
os.Remove(tempPath)
return fmt.Errorf("failed to rename temp file to target: %w", err)
}
return nil
}
// RPC call to set the task status to done
func SetTaskDone(workerId string, taskType string, intermediateFiles []string) {
args := RpcArgument{}
args.WorkerId = workerId
args.Method = "set_task_done"
args.TaskType = taskType
args.IntermediateFiles = intermediateFiles
reply := RpcReply{}
ok := call("Coordinator.SetTaskDone", &args, &reply)
if !ok {
fmt.Printf("call failed!\n")
}
/*
if ok {
fmt.Printf("ok\n")
} else {
fmt.Printf("call failed!\n")
}
*/
}
// RPC call to request task from coordinator
func RequestTask() (string, string, []string, int) {
// declare an arg and reply
args := RpcArgument{}
args.Method = "request_task"
reply := RpcReply{}
ok := call("Coordinator.GetTasks", &args, &reply)
if !ok {
fmt.Printf("call failed!\n")
}
return reply.WorkerId, reply.TaskType, reply.Filenames, reply.NReduce
}
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}