Compare commits
5 Commits
dffbcad936
...
lab1
| Author | SHA1 | Date | |
|---|---|---|---|
| 00bfc7b4ef | |||
| c1ad7ba1aa | |||
| 005df49fbe | |||
| 549f538bbf | |||
| 1378032bfe |
@@ -1,13 +1,9 @@
|
||||
package kvsrv
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"log"
|
||||
"math/big"
|
||||
import "6.5840/labrpc"
|
||||
import "crypto/rand"
|
||||
import "math/big"
|
||||
|
||||
"6.5840/labrpc"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type Clerk struct {
|
||||
server *labrpc.ClientEnd
|
||||
@@ -39,21 +35,9 @@ func MakeClerk(server *labrpc.ClientEnd) *Clerk {
|
||||
// must match the declared types of the RPC handler function's
|
||||
// arguments. and reply must be passed as a pointer.
|
||||
func (ck *Clerk) Get(key string) string {
|
||||
id := uuid.New()
|
||||
args := GetArgs{
|
||||
Key: key,
|
||||
Uuid: id,
|
||||
}
|
||||
reply := GetReply{}
|
||||
ok := false
|
||||
for !ok {
|
||||
ok = ck.server.Call("KVServer.Get", &args, &reply)
|
||||
if !ok {
|
||||
log.Println("RPC call failed: GET op")
|
||||
}
|
||||
}
|
||||
|
||||
// You will have to modify this function.
|
||||
return reply.Value
|
||||
return ""
|
||||
}
|
||||
|
||||
// shared by Put and Append.
|
||||
@@ -66,22 +50,7 @@ func (ck *Clerk) Get(key string) string {
|
||||
// arguments. and reply must be passed as a pointer.
|
||||
func (ck *Clerk) PutAppend(key string, value string, op string) string {
|
||||
// You will have to modify this function.
|
||||
id := uuid.New()
|
||||
args := PutAppendArgs{
|
||||
Key: key,
|
||||
Value: value,
|
||||
Uuid: id,
|
||||
}
|
||||
reply := PutAppendReply{}
|
||||
ok := false
|
||||
for !ok {
|
||||
ok := ck.server.Call("KVServer."+op, &args, &reply)
|
||||
if !ok {
|
||||
log.Printf("RPC call failed: %s op\n", op)
|
||||
}
|
||||
}
|
||||
|
||||
return reply.Value
|
||||
return ""
|
||||
}
|
||||
|
||||
func (ck *Clerk) Put(key string, value string) {
|
||||
|
||||
@@ -7,7 +7,6 @@ type PutAppendArgs struct {
|
||||
// You'll have to add definitions here.
|
||||
// Field names must start with capital letters,
|
||||
// otherwise RPC will break.
|
||||
Uuid string
|
||||
}
|
||||
|
||||
type PutAppendReply struct {
|
||||
@@ -17,7 +16,6 @@ type PutAppendReply struct {
|
||||
type GetArgs struct {
|
||||
Key string
|
||||
// You'll have to add definitions here.
|
||||
Uuid string
|
||||
}
|
||||
|
||||
type GetReply struct {
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
const Debug = true
|
||||
const Debug = false
|
||||
|
||||
func DPrintf(format string, a ...interface{}) (n int, err error) {
|
||||
if Debug {
|
||||
@@ -14,47 +14,30 @@ func DPrintf(format string, a ...interface{}) (n int, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
type KVServer struct {
|
||||
mu sync.Mutex
|
||||
KVStore map[string]string
|
||||
OpLog map[string]bool
|
||||
mu sync.Mutex
|
||||
|
||||
// Your definitions here.
|
||||
}
|
||||
|
||||
|
||||
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
|
||||
// Your code here.
|
||||
kv.mu.Lock()
|
||||
defer kv.mu.Unlock()
|
||||
|
||||
reply.Value = kv.KVStore[args.Key]
|
||||
}
|
||||
|
||||
func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) {
|
||||
// Your code here.
|
||||
kv.mu.Lock()
|
||||
defer kv.mu.Unlock()
|
||||
|
||||
kv.KVStore[args.Key] = args.Value
|
||||
reply.Value = args.Value
|
||||
}
|
||||
|
||||
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
|
||||
// Your code here.
|
||||
kv.mu.Lock()
|
||||
defer kv.mu.Unlock()
|
||||
|
||||
val, exists := kv.KVStore[args.Key]
|
||||
if !exists {
|
||||
kv.KVStore[args.Key] = args.Value
|
||||
reply.Value = ""
|
||||
} else {
|
||||
kv.KVStore[args.Key] = val + args.Value
|
||||
reply.Value = val
|
||||
}
|
||||
}
|
||||
|
||||
func StartKVServer() *KVServer {
|
||||
kv := new(KVServer)
|
||||
kv.KVStore = map[string]string{}
|
||||
|
||||
// You may need initialization code here.
|
||||
|
||||
return kv
|
||||
}
|
||||
|
||||
@@ -1,33 +1,186 @@
|
||||
package mr
|
||||
|
||||
import "log"
|
||||
import "net"
|
||||
import "os"
|
||||
import "net/rpc"
|
||||
import "net/http"
|
||||
import (
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/big"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
JOB_TIMEOUT_S = 10 * time.Second
|
||||
GET_TASK_TIMEOUT = 2 * time.Second
|
||||
DEBUG = false
|
||||
)
|
||||
|
||||
type Coordinator struct {
|
||||
// Your definitions here.
|
||||
MapJobs []Job
|
||||
ReduceJobs []Job
|
||||
NReduce int
|
||||
mu sync.Mutex
|
||||
mapWg sync.WaitGroup
|
||||
reduceWg sync.WaitGroup
|
||||
}
|
||||
|
||||
type Job struct {
|
||||
WorkerId string
|
||||
Filenames []string
|
||||
Status bool
|
||||
Active bool
|
||||
LastPing time.Time
|
||||
}
|
||||
|
||||
// Your code here -- RPC handlers for the worker to call.
|
||||
|
||||
//
|
||||
// an example RPC handler.
|
||||
//
|
||||
// the RPC argument and reply types are defined in rpc.go.
|
||||
//
|
||||
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
|
||||
reply.Y = args.X + 1
|
||||
func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
if args.Method != "request_task" {
|
||||
return errors.New("argument .Method not valid for this procedure")
|
||||
}
|
||||
// check for any non-completed OR non-active map jobs
|
||||
for i, mj := range c.MapJobs {
|
||||
// assign job that not already done OR not actively being run by other workers
|
||||
// status = false AND active = false
|
||||
if !mj.Status && !mj.Active {
|
||||
wId, err := genWorkerId()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
reply.NReduce = c.NReduce
|
||||
reply.TaskType = "map"
|
||||
reply.Filenames = mj.Filenames
|
||||
reply.WorkerId = wId
|
||||
// set job status, taken by a worke
|
||||
c.MapJobs[i].WorkerId = wId
|
||||
c.MapJobs[i].Active = true
|
||||
c.MapJobs[i].LastPing = time.Now()
|
||||
if DEBUG {
|
||||
fmt.Printf("[Map Job Assigned] %s | file: %s | status: %t | active: %t |\n",
|
||||
c.MapJobs[i].WorkerId,
|
||||
c.MapJobs[i].Filenames[0],
|
||||
c.MapJobs[i].Status, c.MapJobs[i].Active)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
c.mapWg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
if DEBUG {
|
||||
fmt.Println("Wait done. Continuing...")
|
||||
}
|
||||
case <-time.After(GET_TASK_TIMEOUT):
|
||||
return nil
|
||||
}
|
||||
|
||||
done = make(chan struct{})
|
||||
|
||||
// check for reduce jobs
|
||||
for i, rj := range c.ReduceJobs {
|
||||
if !rj.Status && !rj.Active {
|
||||
wId, err := genWorkerId()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
reply.WorkerId = wId
|
||||
reply.NReduce = c.NReduce
|
||||
reply.TaskType = "reduce"
|
||||
reply.Filenames = rj.Filenames
|
||||
// set job status, taken by a worker
|
||||
c.ReduceJobs[i].WorkerId = wId
|
||||
c.ReduceJobs[i].Active = true
|
||||
c.ReduceJobs[i].LastPing = time.Now()
|
||||
if DEBUG {
|
||||
fmt.Printf("[Reduce Job Assigned] %s | status: %t | active: %t |\n",
|
||||
c.ReduceJobs[i].WorkerId, c.ReduceJobs[i].Status, c.ReduceJobs[i].Active)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
c.reduceWg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
if DEBUG {
|
||||
fmt.Println("Wait done. Continuing...")
|
||||
}
|
||||
case <-time.After(GET_TASK_TIMEOUT):
|
||||
return nil
|
||||
}
|
||||
|
||||
reply.TaskType = "done"
|
||||
reply.NReduce = c.NReduce
|
||||
reply.Filenames = []string{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set task complete
|
||||
func (c *Coordinator) SetTaskDone(args *RpcArgument, reply *RpcReply) error {
|
||||
if args.Method != "set_task_done" {
|
||||
return errors.New("argument .Method not valid for this procedure")
|
||||
}
|
||||
if args.TaskType == "map" {
|
||||
wId := args.WorkerId
|
||||
for i, _ := range c.MapJobs {
|
||||
if c.MapJobs[i].WorkerId == wId {
|
||||
c.MapJobs[i].Status = true
|
||||
c.MapJobs[i].Active = false
|
||||
c.mapWg.Done()
|
||||
if DEBUG {
|
||||
fmt.Printf("[DONE] %s | file: %s | status: %t | active: %t\n",
|
||||
c.MapJobs[i].WorkerId,
|
||||
c.MapJobs[i].Filenames[0],
|
||||
c.MapJobs[i].Status, c.MapJobs[i].Active)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Check again
|
||||
for _, ifn := range args.IntermediateFiles {
|
||||
reduceJobIdx := strings.Split(ifn, "-")[2]
|
||||
idx, err := strconv.Atoi(reduceJobIdx)
|
||||
if err != nil {
|
||||
fmt.Printf("Error converting string to integer: %v\n", err)
|
||||
}
|
||||
c.ReduceJobs[idx].Filenames = append(c.ReduceJobs[idx].Filenames, ifn)
|
||||
}
|
||||
|
||||
} else if args.TaskType == "reduce" {
|
||||
wId := args.WorkerId
|
||||
for i, _ := range c.ReduceJobs {
|
||||
if c.ReduceJobs[i].WorkerId == wId {
|
||||
c.ReduceJobs[i].Status = true
|
||||
c.ReduceJobs[i].Active = false
|
||||
c.reduceWg.Done()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return errors.New("argument .TaskType not valid for this procedure")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//
|
||||
// start a thread that listens for RPCs from worker.go
|
||||
//
|
||||
func (c *Coordinator) server() {
|
||||
rpc.Register(c)
|
||||
rpc.HandleHTTP()
|
||||
@@ -41,30 +194,105 @@ func (c *Coordinator) server() {
|
||||
go http.Serve(l, nil)
|
||||
}
|
||||
|
||||
//
|
||||
// check scheduled tasks; if their age > JOB_TIMEOUT_S
|
||||
// reset the job status, making it available for scheduling
|
||||
func (c *Coordinator) CheckTaskValidity() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
for i, mj := range c.MapJobs {
|
||||
timeNow := time.Now()
|
||||
duration := timeNow.Sub(mj.LastPing)
|
||||
if DEBUG {
|
||||
fmt.Printf("[Worker Status] id: %s | time: %s | active: %t | status: %t\n", mj.WorkerId, duration.String(), mj.Active, mj.Status)
|
||||
}
|
||||
if mj.Active && !mj.Status {
|
||||
if duration >= JOB_TIMEOUT_S {
|
||||
// unset worker id and set the status
|
||||
if DEBUG {
|
||||
fmt.Printf("[Terminating Worker] %s", mj.WorkerId)
|
||||
}
|
||||
c.MapJobs[i].WorkerId = ""
|
||||
c.MapJobs[i].Active = false
|
||||
c.MapJobs[i].Status = false
|
||||
}
|
||||
}
|
||||
}
|
||||
for i, rj := range c.ReduceJobs {
|
||||
timeNow := time.Now()
|
||||
if rj.Active && !rj.Status {
|
||||
duration := timeNow.Sub(rj.LastPing)
|
||||
if duration >= JOB_TIMEOUT_S {
|
||||
if DEBUG {
|
||||
fmt.Printf("[Terminating Worker] %s", rj.WorkerId)
|
||||
}
|
||||
c.ReduceJobs[i].WorkerId = ""
|
||||
c.ReduceJobs[i].Active = false
|
||||
c.ReduceJobs[i].Status = false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// main/mrcoordinator.go calls Done() periodically to find out
|
||||
// if the entire job has finished.
|
||||
//
|
||||
func (c *Coordinator) Done() bool {
|
||||
ret := false
|
||||
|
||||
// Your code here.
|
||||
|
||||
|
||||
go c.CheckTaskValidity()
|
||||
// Check any non-completed jobs
|
||||
for _, mj := range c.MapJobs {
|
||||
if !mj.Status {
|
||||
return ret
|
||||
}
|
||||
}
|
||||
for _, mj := range c.ReduceJobs {
|
||||
if !mj.Status {
|
||||
return ret
|
||||
}
|
||||
}
|
||||
// all mj completed
|
||||
ret = true
|
||||
return ret
|
||||
}
|
||||
|
||||
//
|
||||
// create a Coordinator.
|
||||
// main/mrcoordinator.go calls this function.
|
||||
// nReduce is the number of reduce tasks to use.
|
||||
//
|
||||
func MakeCoordinator(files []string, nReduce int) *Coordinator {
|
||||
c := Coordinator{}
|
||||
|
||||
// Your code here.
|
||||
c.NReduce = nReduce
|
||||
|
||||
for i := range files {
|
||||
mj := Job{}
|
||||
mj.Filenames = []string{files[i]}
|
||||
mj.Status = false
|
||||
mj.Active = false
|
||||
c.mapWg.Add(1)
|
||||
c.MapJobs = append(c.MapJobs, mj)
|
||||
}
|
||||
|
||||
for j := 0; j < nReduce; j++ {
|
||||
reduceJob := Job{}
|
||||
reduceJob.Active = false
|
||||
reduceJob.Filenames = []string{}
|
||||
reduceJob.Status = false
|
||||
c.reduceWg.Add(1)
|
||||
c.ReduceJobs = append(c.ReduceJobs, reduceJob)
|
||||
}
|
||||
|
||||
c.server()
|
||||
return &c
|
||||
}
|
||||
|
||||
func genWorkerId() (string, error) {
|
||||
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||||
result := make([]byte, 8)
|
||||
for i := range result {
|
||||
index, err := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error generating random number: %v", err)
|
||||
}
|
||||
result[i] = charset[index.Int64()]
|
||||
}
|
||||
return string(result), nil
|
||||
}
|
||||
|
||||
@@ -6,25 +6,32 @@ package mr
|
||||
// remember to capitalize all names.
|
||||
//
|
||||
|
||||
import "os"
|
||||
import "strconv"
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
//
|
||||
// example to show how to declare the arguments
|
||||
// and reply for an RPC.
|
||||
//
|
||||
|
||||
type ExampleArgs struct {
|
||||
X int
|
||||
type RpcArgument struct {
|
||||
WorkerId string
|
||||
Method string
|
||||
TaskType string
|
||||
IntermediateFiles []string
|
||||
}
|
||||
|
||||
type ExampleReply struct {
|
||||
Y int
|
||||
type RpcReply struct {
|
||||
WorkerId string
|
||||
TaskType string
|
||||
Filenames []string
|
||||
NReduce int
|
||||
}
|
||||
|
||||
// Add your RPC definitions here.
|
||||
|
||||
|
||||
// Cook up a unique-ish UNIX-domain socket name
|
||||
// in /var/tmp, for the coordinator.
|
||||
// Can't use the current directory since
|
||||
|
||||
244
src/mr/worker.go
244
src/mr/worker.go
@@ -1,77 +1,245 @@
|
||||
package mr
|
||||
|
||||
import "fmt"
|
||||
import "log"
|
||||
import "net/rpc"
|
||||
import "hash/fnv"
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"io"
|
||||
"log"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
||||
//
|
||||
// Map functions return a slice of KeyValue.
|
||||
//
|
||||
type KeyValue struct {
|
||||
Key string
|
||||
Value string
|
||||
}
|
||||
|
||||
//
|
||||
// Sorting
|
||||
// for sorting by key.
|
||||
type ByKey []KeyValue
|
||||
|
||||
// for sorting by key.
|
||||
func (a ByKey) Len() int { return len(a) }
|
||||
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
|
||||
|
||||
// use ihash(key) % NReduce to choose the reduce
|
||||
// task number for each KeyValue emitted by Map.
|
||||
//
|
||||
func ihash(key string) int {
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(key))
|
||||
return int(h.Sum32() & 0x7fffffff)
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// main/mrworker.go calls this function.
|
||||
//
|
||||
func Worker(mapf func(string, string) []KeyValue,
|
||||
reducef func(string, []string) string) {
|
||||
|
||||
// Your worker implementation here.
|
||||
var workerId, taskType string
|
||||
var inFilenames []string
|
||||
var nReduce int
|
||||
|
||||
// uncomment to send the Example RPC to the coordinator.
|
||||
// CallExample()
|
||||
taskType = "init"
|
||||
for taskType != "done" {
|
||||
//for taskType != "done" && taskType != "init" && taskType != "map" && taskType != "reduce" {
|
||||
//workerId, taskType, inFilenames, nReduce = RequestTask()
|
||||
//}
|
||||
for {
|
||||
workerId, taskType, inFilenames, nReduce = RequestTask()
|
||||
|
||||
// Check if taskType is valid
|
||||
if taskType == "done" || taskType == "init" || taskType == "map" || taskType == "reduce" {
|
||||
break // Exit the loop if taskType is valid
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
//fmt.Println("Invalid taskType, retrying...")
|
||||
}
|
||||
|
||||
if taskType == "done" {
|
||||
return
|
||||
}
|
||||
if taskType == "map" {
|
||||
ifn := inFilenames[0]
|
||||
file, err := os.Open(ifn)
|
||||
if err != nil {
|
||||
log.Fatalf("cannot open %v", ifn)
|
||||
}
|
||||
content, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
log.Fatalf("cannot read %v", ifn)
|
||||
}
|
||||
file.Close()
|
||||
kva := mapf(ifn, string(content))
|
||||
//hash the map job id
|
||||
mapTaskIdx := ihash(ifn)
|
||||
|
||||
IntermediaryFileMaps := make(map[int][]KeyValue)
|
||||
|
||||
for i := 0; i < nReduce; i++ {
|
||||
IntermediaryFileMaps[i] = []KeyValue{}
|
||||
}
|
||||
|
||||
for i := range kva {
|
||||
//log.Printf("%s: %s", kva[i].Key, kva[i].Value)
|
||||
idx := ihash(kva[i].Key) % nReduce
|
||||
IntermediaryFileMaps[idx] = append(IntermediaryFileMaps[idx], kva[i])
|
||||
}
|
||||
|
||||
intermediateFiles := []string{}
|
||||
for idx := range IntermediaryFileMaps {
|
||||
outFile := fmt.Sprintf("mr-%d-%d", mapTaskIdx, idx)
|
||||
intermediateFiles = append(intermediateFiles, outFile)
|
||||
jsonData, err := json.Marshal(IntermediaryFileMaps[idx])
|
||||
if err != nil {
|
||||
log.Fatal("failed to marshal json")
|
||||
}
|
||||
if err = WriteTempAtomic(outFile, jsonData); err != nil {
|
||||
log.Fatalf("failed to write KV map to file")
|
||||
}
|
||||
}
|
||||
SetTaskDone(workerId, "map", intermediateFiles)
|
||||
|
||||
} else if taskType == "reduce" {
|
||||
var intermediate []KeyValue
|
||||
|
||||
for _, f := range inFilenames {
|
||||
// Open the file
|
||||
file, err := os.Open(f)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to open file: %v", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// Read the file contents
|
||||
data, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to read file: %v", err)
|
||||
}
|
||||
|
||||
// Parse the JSON data
|
||||
var intermidateShard []KeyValue
|
||||
if err := json.Unmarshal(data, &intermidateShard); err != nil {
|
||||
log.Fatalf("Failed to parse JSON: %v", err)
|
||||
}
|
||||
intermediate = append(intermediate, intermidateShard...)
|
||||
}
|
||||
|
||||
sort.Sort(ByKey(intermediate))
|
||||
|
||||
// Prepare output file
|
||||
reduceJobNum := strings.Split(inFilenames[0], "-")[2]
|
||||
oFilename := fmt.Sprintf("mr-out-%s", reduceJobNum)
|
||||
|
||||
var buffer bytes.Buffer
|
||||
i := 0
|
||||
for i < len(intermediate) {
|
||||
j := i + 1
|
||||
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
|
||||
j++
|
||||
}
|
||||
values := []string{}
|
||||
for k := i; k < j; k++ {
|
||||
values = append(values, intermediate[k].Value)
|
||||
}
|
||||
output := reducef(intermediate[i].Key, values)
|
||||
|
||||
// this is the correct format for each line of Reduce output.
|
||||
fmt.Fprintf(&buffer, "%v %v\n", intermediate[i].Key, output)
|
||||
//fmt.Println(buffer.String())
|
||||
i = j
|
||||
}
|
||||
if err := WriteTempAtomic(oFilename, buffer.Bytes()); err != nil {
|
||||
log.Fatalf("failed to write KV map to file: %w", err)
|
||||
}
|
||||
SetTaskDone(workerId, "reduce", []string{})
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//
|
||||
// example function to show how to make an RPC call to the coordinator.
|
||||
//
|
||||
// the RPC argument and reply types are defined in rpc.go.
|
||||
//
|
||||
func CallExample() {
|
||||
func WriteTempAtomic(fn string, data []byte) error {
|
||||
// Create a temporary file in the target directory
|
||||
currentDir, err := os.Getwd()
|
||||
tempFile, err := os.CreateTemp(currentDir, fn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create temp file: %w", err)
|
||||
}
|
||||
tempPath := tempFile.Name()
|
||||
|
||||
// declare an argument structure.
|
||||
args := ExampleArgs{}
|
||||
if _, err := tempFile.Write(data); err != nil {
|
||||
tempFile.Close()
|
||||
os.Remove(tempPath)
|
||||
return fmt.Errorf("failed to write to temp file: %w", err)
|
||||
}
|
||||
|
||||
// fill in the argument(s).
|
||||
args.X = 99
|
||||
// Ensure the file is fully written to disk
|
||||
if err := tempFile.Sync(); err != nil {
|
||||
tempFile.Close()
|
||||
os.Remove(tempPath)
|
||||
return fmt.Errorf("failed to sync temp file: %w", err)
|
||||
}
|
||||
|
||||
// declare a reply structure.
|
||||
reply := ExampleReply{}
|
||||
if err := tempFile.Close(); err != nil {
|
||||
os.Remove(tempPath)
|
||||
return fmt.Errorf("failed to close temp file: %w", err)
|
||||
}
|
||||
|
||||
// send the RPC request, wait for the reply.
|
||||
// the "Coordinator.Example" tells the
|
||||
// receiving server that we'd like to call
|
||||
// the Example() method of struct Coordinator.
|
||||
ok := call("Coordinator.Example", &args, &reply)
|
||||
if ok {
|
||||
// reply.Y should be 100.
|
||||
fmt.Printf("reply.Y %v\n", reply.Y)
|
||||
} else {
|
||||
// Compute the target file path
|
||||
targetPath := filepath.Join(currentDir, fn)
|
||||
|
||||
// Atomically replace the target file with the temporary file
|
||||
if err := os.Rename(tempPath, targetPath); err != nil {
|
||||
os.Remove(tempPath)
|
||||
return fmt.Errorf("failed to rename temp file to target: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RPC call to set the task status to done
|
||||
func SetTaskDone(workerId string, taskType string, intermediateFiles []string) {
|
||||
args := RpcArgument{}
|
||||
args.WorkerId = workerId
|
||||
args.Method = "set_task_done"
|
||||
args.TaskType = taskType
|
||||
args.IntermediateFiles = intermediateFiles
|
||||
reply := RpcReply{}
|
||||
ok := call("Coordinator.SetTaskDone", &args, &reply)
|
||||
if !ok {
|
||||
fmt.Printf("call failed!\n")
|
||||
}
|
||||
/*
|
||||
if ok {
|
||||
fmt.Printf("ok\n")
|
||||
} else {
|
||||
fmt.Printf("call failed!\n")
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
// RPC call to request task from coordinator
|
||||
func RequestTask() (string, string, []string, int) {
|
||||
// declare an arg and reply
|
||||
args := RpcArgument{}
|
||||
args.Method = "request_task"
|
||||
reply := RpcReply{}
|
||||
|
||||
ok := call("Coordinator.GetTasks", &args, &reply)
|
||||
if !ok {
|
||||
fmt.Printf("call failed!\n")
|
||||
}
|
||||
return reply.WorkerId, reply.TaskType, reply.Filenames, reply.NReduce
|
||||
}
|
||||
|
||||
//
|
||||
// send an RPC request to the coordinator, wait for the response.
|
||||
// usually returns true.
|
||||
// returns false if something goes wrong.
|
||||
//
|
||||
func call(rpcname string, args interface{}, reply interface{}) bool {
|
||||
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
|
||||
sockname := coordinatorSock()
|
||||
|
||||
Reference in New Issue
Block a user