2 Commits
lab1 ... lab2

Author SHA1 Message Date
dffbcad936 cp before pse 2025-01-06 18:51:37 +07:00
63706b6685 cp1 2025-01-01 18:25:05 +07:00
6 changed files with 132 additions and 485 deletions

View File

@@ -1,9 +1,13 @@
package kvsrv package kvsrv
import "6.5840/labrpc" import (
import "crypto/rand" "crypto/rand"
import "math/big" "log"
"math/big"
"6.5840/labrpc"
"github.com/google/uuid"
)
type Clerk struct { type Clerk struct {
server *labrpc.ClientEnd server *labrpc.ClientEnd
@@ -35,9 +39,21 @@ func MakeClerk(server *labrpc.ClientEnd) *Clerk {
// must match the declared types of the RPC handler function's // must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer. // arguments. and reply must be passed as a pointer.
func (ck *Clerk) Get(key string) string { func (ck *Clerk) Get(key string) string {
id := uuid.New()
args := GetArgs{
Key: key,
Uuid: id,
}
reply := GetReply{}
ok := false
for !ok {
ok = ck.server.Call("KVServer.Get", &args, &reply)
if !ok {
log.Println("RPC call failed: GET op")
}
}
// You will have to modify this function. // You will have to modify this function.
return "" return reply.Value
} }
// shared by Put and Append. // shared by Put and Append.
@@ -50,7 +66,22 @@ func (ck *Clerk) Get(key string) string {
// arguments. and reply must be passed as a pointer. // arguments. and reply must be passed as a pointer.
func (ck *Clerk) PutAppend(key string, value string, op string) string { func (ck *Clerk) PutAppend(key string, value string, op string) string {
// You will have to modify this function. // You will have to modify this function.
return "" id := uuid.New()
args := PutAppendArgs{
Key: key,
Value: value,
Uuid: id,
}
reply := PutAppendReply{}
ok := false
for !ok {
ok := ck.server.Call("KVServer."+op, &args, &reply)
if !ok {
log.Printf("RPC call failed: %s op\n", op)
}
}
return reply.Value
} }
func (ck *Clerk) Put(key string, value string) { func (ck *Clerk) Put(key string, value string) {

View File

@@ -7,6 +7,7 @@ type PutAppendArgs struct {
// You'll have to add definitions here. // You'll have to add definitions here.
// Field names must start with capital letters, // Field names must start with capital letters,
// otherwise RPC will break. // otherwise RPC will break.
Uuid string
} }
type PutAppendReply struct { type PutAppendReply struct {
@@ -16,6 +17,7 @@ type PutAppendReply struct {
type GetArgs struct { type GetArgs struct {
Key string Key string
// You'll have to add definitions here. // You'll have to add definitions here.
Uuid string
} }
type GetReply struct { type GetReply struct {

View File

@@ -5,7 +5,7 @@ import (
"sync" "sync"
) )
const Debug = false const Debug = true
func DPrintf(format string, a ...interface{}) (n int, err error) { func DPrintf(format string, a ...interface{}) (n int, err error) {
if Debug { if Debug {
@@ -14,30 +14,47 @@ func DPrintf(format string, a ...interface{}) (n int, err error) {
return return
} }
type KVServer struct { type KVServer struct {
mu sync.Mutex mu sync.Mutex
KVStore map[string]string
OpLog map[string]bool
// Your definitions here. // Your definitions here.
} }
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here. // Your code here.
kv.mu.Lock()
defer kv.mu.Unlock()
reply.Value = kv.KVStore[args.Key]
} }
func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) { func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here. // Your code here.
kv.mu.Lock()
defer kv.mu.Unlock()
kv.KVStore[args.Key] = args.Value
reply.Value = args.Value
} }
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) { func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here. // Your code here.
kv.mu.Lock()
defer kv.mu.Unlock()
val, exists := kv.KVStore[args.Key]
if !exists {
kv.KVStore[args.Key] = args.Value
reply.Value = ""
} else {
kv.KVStore[args.Key] = val + args.Value
reply.Value = val
}
} }
func StartKVServer() *KVServer { func StartKVServer() *KVServer {
kv := new(KVServer) kv := new(KVServer)
kv.KVStore = map[string]string{}
// You may need initialization code here.
return kv return kv
} }

View File

@@ -1,186 +1,33 @@
package mr package mr
import ( import "log"
"crypto/rand" import "net"
"errors" import "os"
"fmt" import "net/rpc"
"log" import "net/http"
"math/big"
"net"
"net/http"
"net/rpc"
"os"
"strconv"
"strings"
"sync"
"time"
)
const (
JOB_TIMEOUT_S = 10 * time.Second
GET_TASK_TIMEOUT = 2 * time.Second
DEBUG = false
)
type Coordinator struct { type Coordinator struct {
MapJobs []Job // Your definitions here.
ReduceJobs []Job
NReduce int
mu sync.Mutex
mapWg sync.WaitGroup
reduceWg sync.WaitGroup
}
type Job struct {
WorkerId string
Filenames []string
Status bool
Active bool
LastPing time.Time
} }
// Your code here -- RPC handlers for the worker to call. // Your code here -- RPC handlers for the worker to call.
//
// an example RPC handler.
// //
// the RPC argument and reply types are defined in rpc.go. // the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error { //
c.mu.Lock() func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
defer c.mu.Unlock() reply.Y = args.X + 1
done := make(chan struct{})
if args.Method != "request_task" {
return errors.New("argument .Method not valid for this procedure")
}
// check for any non-completed OR non-active map jobs
for i, mj := range c.MapJobs {
// assign job that not already done OR not actively being run by other workers
// status = false AND active = false
if !mj.Status && !mj.Active {
wId, err := genWorkerId()
if err != nil {
fmt.Println(err)
}
reply.NReduce = c.NReduce
reply.TaskType = "map"
reply.Filenames = mj.Filenames
reply.WorkerId = wId
// set job status, taken by a worke
c.MapJobs[i].WorkerId = wId
c.MapJobs[i].Active = true
c.MapJobs[i].LastPing = time.Now()
if DEBUG {
fmt.Printf("[Map Job Assigned] %s | file: %s | status: %t | active: %t |\n",
c.MapJobs[i].WorkerId,
c.MapJobs[i].Filenames[0],
c.MapJobs[i].Status, c.MapJobs[i].Active)
}
return nil
}
}
go func() {
c.mapWg.Wait()
close(done)
}()
select {
case <-done:
if DEBUG {
fmt.Println("Wait done. Continuing...")
}
case <-time.After(GET_TASK_TIMEOUT):
return nil
}
done = make(chan struct{})
// check for reduce jobs
for i, rj := range c.ReduceJobs {
if !rj.Status && !rj.Active {
wId, err := genWorkerId()
if err != nil {
fmt.Println(err)
}
reply.WorkerId = wId
reply.NReduce = c.NReduce
reply.TaskType = "reduce"
reply.Filenames = rj.Filenames
// set job status, taken by a worker
c.ReduceJobs[i].WorkerId = wId
c.ReduceJobs[i].Active = true
c.ReduceJobs[i].LastPing = time.Now()
if DEBUG {
fmt.Printf("[Reduce Job Assigned] %s | status: %t | active: %t |\n",
c.ReduceJobs[i].WorkerId, c.ReduceJobs[i].Status, c.ReduceJobs[i].Active)
}
return nil
}
}
go func() {
c.reduceWg.Wait()
close(done)
}()
select {
case <-done:
if DEBUG {
fmt.Println("Wait done. Continuing...")
}
case <-time.After(GET_TASK_TIMEOUT):
return nil
}
reply.TaskType = "done"
reply.NReduce = c.NReduce
reply.Filenames = []string{}
return nil return nil
} }
// Set task complete
func (c *Coordinator) SetTaskDone(args *RpcArgument, reply *RpcReply) error {
if args.Method != "set_task_done" {
return errors.New("argument .Method not valid for this procedure")
}
if args.TaskType == "map" {
wId := args.WorkerId
for i, _ := range c.MapJobs {
if c.MapJobs[i].WorkerId == wId {
c.MapJobs[i].Status = true
c.MapJobs[i].Active = false
c.mapWg.Done()
if DEBUG {
fmt.Printf("[DONE] %s | file: %s | status: %t | active: %t\n",
c.MapJobs[i].WorkerId,
c.MapJobs[i].Filenames[0],
c.MapJobs[i].Status, c.MapJobs[i].Active)
}
}
}
// Check again
for _, ifn := range args.IntermediateFiles {
reduceJobIdx := strings.Split(ifn, "-")[2]
idx, err := strconv.Atoi(reduceJobIdx)
if err != nil {
fmt.Printf("Error converting string to integer: %v\n", err)
}
c.ReduceJobs[idx].Filenames = append(c.ReduceJobs[idx].Filenames, ifn)
}
} else if args.TaskType == "reduce" {
wId := args.WorkerId
for i, _ := range c.ReduceJobs {
if c.ReduceJobs[i].WorkerId == wId {
c.ReduceJobs[i].Status = true
c.ReduceJobs[i].Active = false
c.reduceWg.Done()
}
}
} else {
return errors.New("argument .TaskType not valid for this procedure")
}
return nil
}
//
// start a thread that listens for RPCs from worker.go // start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() { func (c *Coordinator) server() {
rpc.Register(c) rpc.Register(c)
rpc.HandleHTTP() rpc.HandleHTTP()
@@ -194,105 +41,30 @@ func (c *Coordinator) server() {
go http.Serve(l, nil) go http.Serve(l, nil)
} }
// check scheduled tasks; if their age > JOB_TIMEOUT_S //
// reset the job status, making it available for scheduling
func (c *Coordinator) CheckTaskValidity() {
c.mu.Lock()
defer c.mu.Unlock()
for i, mj := range c.MapJobs {
timeNow := time.Now()
duration := timeNow.Sub(mj.LastPing)
if DEBUG {
fmt.Printf("[Worker Status] id: %s | time: %s | active: %t | status: %t\n", mj.WorkerId, duration.String(), mj.Active, mj.Status)
}
if mj.Active && !mj.Status {
if duration >= JOB_TIMEOUT_S {
// unset worker id and set the status
if DEBUG {
fmt.Printf("[Terminating Worker] %s", mj.WorkerId)
}
c.MapJobs[i].WorkerId = ""
c.MapJobs[i].Active = false
c.MapJobs[i].Status = false
}
}
}
for i, rj := range c.ReduceJobs {
timeNow := time.Now()
if rj.Active && !rj.Status {
duration := timeNow.Sub(rj.LastPing)
if duration >= JOB_TIMEOUT_S {
if DEBUG {
fmt.Printf("[Terminating Worker] %s", rj.WorkerId)
}
c.ReduceJobs[i].WorkerId = ""
c.ReduceJobs[i].Active = false
c.ReduceJobs[i].Status = false
}
}
}
}
// main/mrcoordinator.go calls Done() periodically to find out // main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished. // if the entire job has finished.
//
func (c *Coordinator) Done() bool { func (c *Coordinator) Done() bool {
ret := false ret := false
go c.CheckTaskValidity()
// Check any non-completed jobs // Your code here.
for _, mj := range c.MapJobs {
if !mj.Status {
return ret
}
}
for _, mj := range c.ReduceJobs {
if !mj.Status {
return ret
}
}
// all mj completed
ret = true
return ret return ret
} }
//
// create a Coordinator. // create a Coordinator.
// main/mrcoordinator.go calls this function. // main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use. // nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator { func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{} c := Coordinator{}
c.NReduce = nReduce // Your code here.
for i := range files {
mj := Job{}
mj.Filenames = []string{files[i]}
mj.Status = false
mj.Active = false
c.mapWg.Add(1)
c.MapJobs = append(c.MapJobs, mj)
}
for j := 0; j < nReduce; j++ {
reduceJob := Job{}
reduceJob.Active = false
reduceJob.Filenames = []string{}
reduceJob.Status = false
c.reduceWg.Add(1)
c.ReduceJobs = append(c.ReduceJobs, reduceJob)
}
c.server() c.server()
return &c return &c
} }
func genWorkerId() (string, error) {
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
result := make([]byte, 8)
for i := range result {
index, err := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
if err != nil {
return "", fmt.Errorf("error generating random number: %v", err)
}
result[i] = charset[index.Int64()]
}
return string(result), nil
}

View File

@@ -6,32 +6,25 @@ package mr
// remember to capitalize all names. // remember to capitalize all names.
// //
import ( import "os"
"os" import "strconv"
"strconv"
)
// //
// example to show how to declare the arguments // example to show how to declare the arguments
// and reply for an RPC. // and reply for an RPC.
// //
type RpcArgument struct { type ExampleArgs struct {
WorkerId string X int
Method string
TaskType string
IntermediateFiles []string
} }
type RpcReply struct { type ExampleReply struct {
WorkerId string Y int
TaskType string
Filenames []string
NReduce int
} }
// Add your RPC definitions here. // Add your RPC definitions here.
// Cook up a unique-ish UNIX-domain socket name // Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator. // in /var/tmp, for the coordinator.
// Can't use the current directory since // Can't use the current directory since

View File

@@ -1,245 +1,77 @@
package mr package mr
import ( import "fmt"
"bytes" import "log"
"encoding/json" import "net/rpc"
"fmt" import "hash/fnv"
"hash/fnv"
"io"
"log"
"net/rpc"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
//
// Map functions return a slice of KeyValue. // Map functions return a slice of KeyValue.
//
type KeyValue struct { type KeyValue struct {
Key string Key string
Value string Value string
} }
// Sorting //
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// use ihash(key) % NReduce to choose the reduce // use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map. // task number for each KeyValue emitted by Map.
//
func ihash(key string) int { func ihash(key string) int {
h := fnv.New32a() h := fnv.New32a()
h.Write([]byte(key)) h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff) return int(h.Sum32() & 0x7fffffff)
} }
//
// main/mrworker.go calls this function. // main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue, func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) { reducef func(string, []string) string) {
// Your worker implementation here. // Your worker implementation here.
var workerId, taskType string
var inFilenames []string
var nReduce int
taskType = "init" // uncomment to send the Example RPC to the coordinator.
for taskType != "done" { // CallExample()
//for taskType != "done" && taskType != "init" && taskType != "map" && taskType != "reduce" {
//workerId, taskType, inFilenames, nReduce = RequestTask()
//}
for {
workerId, taskType, inFilenames, nReduce = RequestTask()
// Check if taskType is valid
if taskType == "done" || taskType == "init" || taskType == "map" || taskType == "reduce" {
break // Exit the loop if taskType is valid
}
time.Sleep(1 * time.Second)
//fmt.Println("Invalid taskType, retrying...")
}
if taskType == "done" {
return
}
if taskType == "map" {
ifn := inFilenames[0]
file, err := os.Open(ifn)
if err != nil {
log.Fatalf("cannot open %v", ifn)
}
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", ifn)
}
file.Close()
kva := mapf(ifn, string(content))
//hash the map job id
mapTaskIdx := ihash(ifn)
IntermediaryFileMaps := make(map[int][]KeyValue)
for i := 0; i < nReduce; i++ {
IntermediaryFileMaps[i] = []KeyValue{}
}
for i := range kva {
//log.Printf("%s: %s", kva[i].Key, kva[i].Value)
idx := ihash(kva[i].Key) % nReduce
IntermediaryFileMaps[idx] = append(IntermediaryFileMaps[idx], kva[i])
}
intermediateFiles := []string{}
for idx := range IntermediaryFileMaps {
outFile := fmt.Sprintf("mr-%d-%d", mapTaskIdx, idx)
intermediateFiles = append(intermediateFiles, outFile)
jsonData, err := json.Marshal(IntermediaryFileMaps[idx])
if err != nil {
log.Fatal("failed to marshal json")
}
if err = WriteTempAtomic(outFile, jsonData); err != nil {
log.Fatalf("failed to write KV map to file")
}
}
SetTaskDone(workerId, "map", intermediateFiles)
} else if taskType == "reduce" {
var intermediate []KeyValue
for _, f := range inFilenames {
// Open the file
file, err := os.Open(f)
if err != nil {
log.Fatalf("Failed to open file: %v", err)
}
defer file.Close()
// Read the file contents
data, err := io.ReadAll(file)
if err != nil {
log.Fatalf("Failed to read file: %v", err)
}
// Parse the JSON data
var intermidateShard []KeyValue
if err := json.Unmarshal(data, &intermidateShard); err != nil {
log.Fatalf("Failed to parse JSON: %v", err)
}
intermediate = append(intermediate, intermidateShard...)
}
sort.Sort(ByKey(intermediate))
// Prepare output file
reduceJobNum := strings.Split(inFilenames[0], "-")[2]
oFilename := fmt.Sprintf("mr-out-%s", reduceJobNum)
var buffer bytes.Buffer
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(&buffer, "%v %v\n", intermediate[i].Key, output)
//fmt.Println(buffer.String())
i = j
}
if err := WriteTempAtomic(oFilename, buffer.Bytes()); err != nil {
log.Fatalf("failed to write KV map to file: %w", err)
}
SetTaskDone(workerId, "reduce", []string{})
}
}
} }
func WriteTempAtomic(fn string, data []byte) error { //
// Create a temporary file in the target directory // example function to show how to make an RPC call to the coordinator.
currentDir, err := os.Getwd() //
tempFile, err := os.CreateTemp(currentDir, fn) // the RPC argument and reply types are defined in rpc.go.
if err != nil { //
return fmt.Errorf("failed to create temp file: %w", err) func CallExample() {
}
tempPath := tempFile.Name()
if _, err := tempFile.Write(data); err != nil { // declare an argument structure.
tempFile.Close() args := ExampleArgs{}
os.Remove(tempPath)
return fmt.Errorf("failed to write to temp file: %w", err)
}
// Ensure the file is fully written to disk // fill in the argument(s).
if err := tempFile.Sync(); err != nil { args.X = 99
tempFile.Close()
os.Remove(tempPath)
return fmt.Errorf("failed to sync temp file: %w", err)
}
if err := tempFile.Close(); err != nil { // declare a reply structure.
os.Remove(tempPath) reply := ExampleReply{}
return fmt.Errorf("failed to close temp file: %w", err)
}
// Compute the target file path // send the RPC request, wait for the reply.
targetPath := filepath.Join(currentDir, fn) // the "Coordinator.Example" tells the
// receiving server that we'd like to call
// Atomically replace the target file with the temporary file // the Example() method of struct Coordinator.
if err := os.Rename(tempPath, targetPath); err != nil { ok := call("Coordinator.Example", &args, &reply)
os.Remove(tempPath) if ok {
return fmt.Errorf("failed to rename temp file to target: %w", err) // reply.Y should be 100.
} fmt.Printf("reply.Y %v\n", reply.Y)
return nil } else {
}
// RPC call to set the task status to done
func SetTaskDone(workerId string, taskType string, intermediateFiles []string) {
args := RpcArgument{}
args.WorkerId = workerId
args.Method = "set_task_done"
args.TaskType = taskType
args.IntermediateFiles = intermediateFiles
reply := RpcReply{}
ok := call("Coordinator.SetTaskDone", &args, &reply)
if !ok {
fmt.Printf("call failed!\n") fmt.Printf("call failed!\n")
} }
/*
if ok {
fmt.Printf("ok\n")
} else {
fmt.Printf("call failed!\n")
}
*/
}
// RPC call to request task from coordinator
func RequestTask() (string, string, []string, int) {
// declare an arg and reply
args := RpcArgument{}
args.Method = "request_task"
reply := RpcReply{}
ok := call("Coordinator.GetTasks", &args, &reply)
if !ok {
fmt.Printf("call failed!\n")
}
return reply.WorkerId, reply.TaskType, reply.Filenames, reply.NReduce
} }
//
// send an RPC request to the coordinator, wait for the response. // send an RPC request to the coordinator, wait for the response.
// usually returns true. // usually returns true.
// returns false if something goes wrong. // returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool { func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234") // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock() sockname := coordinatorSock()