Compare commits
13 Commits
420dd2208d
...
lab1
| Author | SHA1 | Date | |
|---|---|---|---|
| 00bfc7b4ef | |||
| c1ad7ba1aa | |||
| 005df49fbe | |||
| 549f538bbf | |||
| 1378032bfe | |||
|
|
db55e49c10 | ||
|
|
ab9b8d2f68 | ||
|
|
f4310a86eb | ||
|
|
52bf53742e | ||
|
|
b6635d50e5 | ||
|
|
7b670fdd48 | ||
|
|
d6f2a7fb5c | ||
|
|
5b5ae16e92 |
@@ -110,11 +110,11 @@ check_lab5b() {
|
|||||||
check_cmd go test -c
|
check_cmd go test -c
|
||||||
# also check other labs/parts
|
# also check other labs/parts
|
||||||
cd "$tmpdir"
|
cd "$tmpdir"
|
||||||
check_lab4a
|
check_lab5a
|
||||||
|
cd "$tmpdir"
|
||||||
|
check_lab4
|
||||||
cd "$tmpdir"
|
cd "$tmpdir"
|
||||||
check_lab3
|
check_lab3
|
||||||
cd "$tmpdir"
|
|
||||||
check_lab2
|
|
||||||
}
|
}
|
||||||
|
|
||||||
check_cmd() {
|
check_cmd() {
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
|
|||||||
// keeps trying forever in the face of all other errors.
|
// keeps trying forever in the face of all other errors.
|
||||||
//
|
//
|
||||||
// you can send an RPC with code like this:
|
// you can send an RPC with code like this:
|
||||||
// ok := ck.servers[i].Call("KVServer.Get", &args, &reply)
|
// ok := ck.servers[i].Call("KVServer."+op, &args, &reply)
|
||||||
//
|
//
|
||||||
// the types of args and reply (including whether they are pointers)
|
// the types of args and reply (including whether they are pointers)
|
||||||
// must match the declared types of the RPC handler function's
|
// must match the declared types of the RPC handler function's
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ type Err string
|
|||||||
type PutAppendArgs struct {
|
type PutAppendArgs struct {
|
||||||
Key string
|
Key string
|
||||||
Value string
|
Value string
|
||||||
Op string // "Put" or "Append"
|
|
||||||
// You'll have to add definitions here.
|
// You'll have to add definitions here.
|
||||||
// Field names must start with capital letters,
|
// Field names must start with capital letters,
|
||||||
// otherwise RPC will break.
|
// otherwise RPC will break.
|
||||||
|
|||||||
@@ -42,7 +42,11 @@ func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
|
|||||||
// Your code here.
|
// Your code here.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
|
func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) {
|
||||||
|
// Your code here.
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
|
||||||
// Your code here.
|
// Your code here.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -373,7 +373,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestMemGet2(t *testing.T) {
|
func TestMemGet2(t *testing.T) {
|
||||||
const MEM = 100 // in MiB
|
const MEM = 10 // in MiB
|
||||||
|
|
||||||
cfg := make_config(t, true)
|
cfg := make_config(t, true)
|
||||||
defer cfg.cleanup()
|
defer cfg.cleanup()
|
||||||
@@ -408,7 +408,7 @@ func TestMemGet2(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMemPut2(t *testing.T) {
|
func TestMemPut2(t *testing.T) {
|
||||||
const MEM = 100 // in MiB
|
const MEM = 10 // in MiB
|
||||||
|
|
||||||
cfg := make_config(t, false)
|
cfg := make_config(t, false)
|
||||||
defer cfg.cleanup()
|
defer cfg.cleanup()
|
||||||
@@ -423,6 +423,7 @@ func TestMemPut2(t *testing.T) {
|
|||||||
ck1.Put("k", "")
|
ck1.Put("k", "")
|
||||||
|
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
|
|
||||||
var st runtime.MemStats
|
var st runtime.MemStats
|
||||||
runtime.ReadMemStats(&st)
|
runtime.ReadMemStats(&st)
|
||||||
m := st.HeapAlloc / MiB
|
m := st.HeapAlloc / MiB
|
||||||
@@ -433,7 +434,7 @@ func TestMemPut2(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMemAppend2(t *testing.T) {
|
func TestMemAppend2(t *testing.T) {
|
||||||
const MEM = 100 // in MiB
|
const MEM = 10 // in MiB
|
||||||
|
|
||||||
cfg := make_config(t, false)
|
cfg := make_config(t, false)
|
||||||
defer cfg.cleanup()
|
defer cfg.cleanup()
|
||||||
@@ -458,63 +459,26 @@ func TestMemAppend2(t *testing.T) {
|
|||||||
cfg.end()
|
cfg.end()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMemPutMany2(t *testing.T) {
|
func TestMemPutManyClients(t *testing.T) {
|
||||||
const (
|
const (
|
||||||
NPUT = 1_000_000
|
NCLIENT = 100_000
|
||||||
MEM = 1000
|
MEM = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
cfg := make_config(t, false)
|
cfg := make_config(t, false)
|
||||||
defer cfg.cleanup()
|
defer cfg.cleanup()
|
||||||
|
|
||||||
cfg.begin("Test: memory use many puts")
|
|
||||||
ck := cfg.makeClient()
|
|
||||||
|
|
||||||
v := randValue(MEM)
|
v := randValue(MEM)
|
||||||
|
|
||||||
ck.Put("k", v)
|
cks := make([]*Clerk, NCLIENT)
|
||||||
|
for i, _ := range cks {
|
||||||
|
cks[i] = cfg.makeClient()
|
||||||
|
}
|
||||||
|
|
||||||
// allow threads started by labrpc to exit
|
// allow threads started by labrpc to start
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
runtime.GC()
|
cfg.begin("Test: memory use many put clients")
|
||||||
|
|
||||||
var st runtime.MemStats
|
|
||||||
runtime.ReadMemStats(&st)
|
|
||||||
m0 := st.HeapAlloc
|
|
||||||
|
|
||||||
for i := 0; i < NPUT; i++ {
|
|
||||||
ck.Put("k", v)
|
|
||||||
}
|
|
||||||
|
|
||||||
// allow threads started by labrpc to exit
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
runtime.GC()
|
|
||||||
runtime.ReadMemStats(&st)
|
|
||||||
m1 := st.HeapAlloc
|
|
||||||
|
|
||||||
//log.Printf("mem m0 %d m1 %d\n", m0, m1)
|
|
||||||
|
|
||||||
if m1 > m0+NPUT/10 {
|
|
||||||
t.Fatalf("error: server using too much memory %d %d\n", m0, m1)
|
|
||||||
}
|
|
||||||
cfg.end()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMemGetMany2(t *testing.T) {
|
|
||||||
const (
|
|
||||||
NCLIENT = 100_000
|
|
||||||
)
|
|
||||||
|
|
||||||
cfg := make_config(t, false)
|
|
||||||
defer cfg.cleanup()
|
|
||||||
|
|
||||||
cfg.begin("Test: memory use many gets")
|
|
||||||
|
|
||||||
ck := cfg.makeClient()
|
|
||||||
ck.Put("0", "")
|
|
||||||
cfg.deleteClient(ck)
|
|
||||||
|
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
@@ -524,25 +488,119 @@ func TestMemGetMany2(t *testing.T) {
|
|||||||
m0 := st.HeapAlloc
|
m0 := st.HeapAlloc
|
||||||
|
|
||||||
for i := 0; i < NCLIENT; i++ {
|
for i := 0; i < NCLIENT; i++ {
|
||||||
ck := cfg.makeClient()
|
cks[i].Put("k", v)
|
||||||
ck.Get("0")
|
|
||||||
cfg.deleteClient(ck)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
runtime.GC()
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
runtime.GC()
|
|
||||||
//runtime.GC()
|
|
||||||
|
|
||||||
runtime.ReadMemStats(&st)
|
runtime.ReadMemStats(&st)
|
||||||
m1 := st.HeapAlloc
|
m1 := st.HeapAlloc
|
||||||
|
f := (float64(m1) - float64(m0)) / NCLIENT
|
||||||
|
if m1 > m0+(NCLIENT*200) {
|
||||||
|
t.Fatalf("error: server using too much memory %d %d (%.2f per client)\n", m0, m1, f)
|
||||||
|
}
|
||||||
|
|
||||||
log.Printf("mem m0 %d m1 %d\n", m0, m1)
|
for _, ck := range cks {
|
||||||
|
cfg.deleteClient(ck)
|
||||||
if m1 >= m0+NCLIENT {
|
|
||||||
t.Fatalf("error: server using too much memory m0 %d m1 %d\n", m0, m1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg.end()
|
cfg.end()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMemGetManyClients(t *testing.T) {
|
||||||
|
const (
|
||||||
|
NCLIENT = 100_000
|
||||||
|
)
|
||||||
|
|
||||||
|
cfg := make_config(t, false)
|
||||||
|
defer cfg.cleanup()
|
||||||
|
|
||||||
|
cfg.begin("Test: memory use many get client")
|
||||||
|
|
||||||
|
ck := cfg.makeClient()
|
||||||
|
ck.Put("0", "")
|
||||||
|
cfg.deleteClient(ck)
|
||||||
|
|
||||||
|
cks := make([]*Clerk, NCLIENT)
|
||||||
|
for i, _ := range cks {
|
||||||
|
cks[i] = cfg.makeClient()
|
||||||
|
}
|
||||||
|
|
||||||
|
// allow threads started by labrpc to start
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
runtime.GC()
|
||||||
|
runtime.GC()
|
||||||
|
|
||||||
|
var st runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&st)
|
||||||
|
m0 := st.HeapAlloc
|
||||||
|
|
||||||
|
for i := 0; i < NCLIENT; i++ {
|
||||||
|
cks[i].Get("0")
|
||||||
|
}
|
||||||
|
|
||||||
|
runtime.GC()
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
runtime.GC()
|
||||||
|
|
||||||
|
runtime.ReadMemStats(&st)
|
||||||
|
m1 := st.HeapAlloc
|
||||||
|
f := (float64(m1) - float64(m0)) / NCLIENT
|
||||||
|
if m1 >= m0+NCLIENT*10 {
|
||||||
|
t.Fatalf("error: server using too much memory m0 %d m1 %d (%.2f per client)\n", m0, m1, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ck := range cks {
|
||||||
|
cfg.deleteClient(ck)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.end()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemManyAppends(t *testing.T) {
|
||||||
|
const (
|
||||||
|
N = 1000
|
||||||
|
MEM = 1000
|
||||||
|
)
|
||||||
|
|
||||||
|
cfg := make_config(t, false)
|
||||||
|
defer cfg.cleanup()
|
||||||
|
|
||||||
|
cfg.begin("Test: memory use many appends")
|
||||||
|
|
||||||
|
ck := cfg.makeClient()
|
||||||
|
rdVal := randValue(MEM)
|
||||||
|
|
||||||
|
runtime.GC()
|
||||||
|
runtime.GC()
|
||||||
|
|
||||||
|
var st runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&st)
|
||||||
|
m0 := st.HeapAlloc
|
||||||
|
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
ck.Append("k", rdVal)
|
||||||
|
}
|
||||||
|
|
||||||
|
runtime.GC()
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
runtime.GC()
|
||||||
|
|
||||||
|
runtime.ReadMemStats(&st)
|
||||||
|
m1 := st.HeapAlloc
|
||||||
|
if m1 >= 3*MEM*N {
|
||||||
|
t.Fatalf("error: server using too much memory m0 %d m1 %d\n", m0, m1)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("m0 %d m1 %d\n", m0, m1)
|
||||||
|
|
||||||
|
cfg.deleteClient(ck)
|
||||||
|
cfg.end()
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,33 +1,186 @@
|
|||||||
package mr
|
package mr
|
||||||
|
|
||||||
import "log"
|
import (
|
||||||
import "net"
|
"crypto/rand"
|
||||||
import "os"
|
"errors"
|
||||||
import "net/rpc"
|
"fmt"
|
||||||
import "net/http"
|
"log"
|
||||||
|
"math/big"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/rpc"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
JOB_TIMEOUT_S = 10 * time.Second
|
||||||
|
GET_TASK_TIMEOUT = 2 * time.Second
|
||||||
|
DEBUG = false
|
||||||
|
)
|
||||||
|
|
||||||
type Coordinator struct {
|
type Coordinator struct {
|
||||||
// Your definitions here.
|
MapJobs []Job
|
||||||
|
ReduceJobs []Job
|
||||||
|
NReduce int
|
||||||
|
mu sync.Mutex
|
||||||
|
mapWg sync.WaitGroup
|
||||||
|
reduceWg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
type Job struct {
|
||||||
|
WorkerId string
|
||||||
|
Filenames []string
|
||||||
|
Status bool
|
||||||
|
Active bool
|
||||||
|
LastPing time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// Your code here -- RPC handlers for the worker to call.
|
// Your code here -- RPC handlers for the worker to call.
|
||||||
|
|
||||||
//
|
|
||||||
// an example RPC handler.
|
|
||||||
//
|
//
|
||||||
// the RPC argument and reply types are defined in rpc.go.
|
// the RPC argument and reply types are defined in rpc.go.
|
||||||
//
|
func (c *Coordinator) GetTasks(args *RpcArgument, reply *RpcReply) error {
|
||||||
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
|
c.mu.Lock()
|
||||||
reply.Y = args.X + 1
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
if args.Method != "request_task" {
|
||||||
|
return errors.New("argument .Method not valid for this procedure")
|
||||||
|
}
|
||||||
|
// check for any non-completed OR non-active map jobs
|
||||||
|
for i, mj := range c.MapJobs {
|
||||||
|
// assign job that not already done OR not actively being run by other workers
|
||||||
|
// status = false AND active = false
|
||||||
|
if !mj.Status && !mj.Active {
|
||||||
|
wId, err := genWorkerId()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
}
|
||||||
|
reply.NReduce = c.NReduce
|
||||||
|
reply.TaskType = "map"
|
||||||
|
reply.Filenames = mj.Filenames
|
||||||
|
reply.WorkerId = wId
|
||||||
|
// set job status, taken by a worke
|
||||||
|
c.MapJobs[i].WorkerId = wId
|
||||||
|
c.MapJobs[i].Active = true
|
||||||
|
c.MapJobs[i].LastPing = time.Now()
|
||||||
|
if DEBUG {
|
||||||
|
fmt.Printf("[Map Job Assigned] %s | file: %s | status: %t | active: %t |\n",
|
||||||
|
c.MapJobs[i].WorkerId,
|
||||||
|
c.MapJobs[i].Filenames[0],
|
||||||
|
c.MapJobs[i].Status, c.MapJobs[i].Active)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
c.mapWg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
if DEBUG {
|
||||||
|
fmt.Println("Wait done. Continuing...")
|
||||||
|
}
|
||||||
|
case <-time.After(GET_TASK_TIMEOUT):
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
done = make(chan struct{})
|
||||||
|
|
||||||
|
// check for reduce jobs
|
||||||
|
for i, rj := range c.ReduceJobs {
|
||||||
|
if !rj.Status && !rj.Active {
|
||||||
|
wId, err := genWorkerId()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
}
|
||||||
|
reply.WorkerId = wId
|
||||||
|
reply.NReduce = c.NReduce
|
||||||
|
reply.TaskType = "reduce"
|
||||||
|
reply.Filenames = rj.Filenames
|
||||||
|
// set job status, taken by a worker
|
||||||
|
c.ReduceJobs[i].WorkerId = wId
|
||||||
|
c.ReduceJobs[i].Active = true
|
||||||
|
c.ReduceJobs[i].LastPing = time.Now()
|
||||||
|
if DEBUG {
|
||||||
|
fmt.Printf("[Reduce Job Assigned] %s | status: %t | active: %t |\n",
|
||||||
|
c.ReduceJobs[i].WorkerId, c.ReduceJobs[i].Status, c.ReduceJobs[i].Active)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
c.reduceWg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
if DEBUG {
|
||||||
|
fmt.Println("Wait done. Continuing...")
|
||||||
|
}
|
||||||
|
case <-time.After(GET_TASK_TIMEOUT):
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.TaskType = "done"
|
||||||
|
reply.NReduce = c.NReduce
|
||||||
|
reply.Filenames = []string{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set task complete
|
||||||
|
func (c *Coordinator) SetTaskDone(args *RpcArgument, reply *RpcReply) error {
|
||||||
|
if args.Method != "set_task_done" {
|
||||||
|
return errors.New("argument .Method not valid for this procedure")
|
||||||
|
}
|
||||||
|
if args.TaskType == "map" {
|
||||||
|
wId := args.WorkerId
|
||||||
|
for i, _ := range c.MapJobs {
|
||||||
|
if c.MapJobs[i].WorkerId == wId {
|
||||||
|
c.MapJobs[i].Status = true
|
||||||
|
c.MapJobs[i].Active = false
|
||||||
|
c.mapWg.Done()
|
||||||
|
if DEBUG {
|
||||||
|
fmt.Printf("[DONE] %s | file: %s | status: %t | active: %t\n",
|
||||||
|
c.MapJobs[i].WorkerId,
|
||||||
|
c.MapJobs[i].Filenames[0],
|
||||||
|
c.MapJobs[i].Status, c.MapJobs[i].Active)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check again
|
||||||
|
for _, ifn := range args.IntermediateFiles {
|
||||||
|
reduceJobIdx := strings.Split(ifn, "-")[2]
|
||||||
|
idx, err := strconv.Atoi(reduceJobIdx)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Error converting string to integer: %v\n", err)
|
||||||
|
}
|
||||||
|
c.ReduceJobs[idx].Filenames = append(c.ReduceJobs[idx].Filenames, ifn)
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if args.TaskType == "reduce" {
|
||||||
|
wId := args.WorkerId
|
||||||
|
for i, _ := range c.ReduceJobs {
|
||||||
|
if c.ReduceJobs[i].WorkerId == wId {
|
||||||
|
c.ReduceJobs[i].Status = true
|
||||||
|
c.ReduceJobs[i].Active = false
|
||||||
|
c.reduceWg.Done()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return errors.New("argument .TaskType not valid for this procedure")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// start a thread that listens for RPCs from worker.go
|
// start a thread that listens for RPCs from worker.go
|
||||||
//
|
|
||||||
func (c *Coordinator) server() {
|
func (c *Coordinator) server() {
|
||||||
rpc.Register(c)
|
rpc.Register(c)
|
||||||
rpc.HandleHTTP()
|
rpc.HandleHTTP()
|
||||||
@@ -41,30 +194,105 @@ func (c *Coordinator) server() {
|
|||||||
go http.Serve(l, nil)
|
go http.Serve(l, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
// check scheduled tasks; if their age > JOB_TIMEOUT_S
|
||||||
|
// reset the job status, making it available for scheduling
|
||||||
|
func (c *Coordinator) CheckTaskValidity() {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
for i, mj := range c.MapJobs {
|
||||||
|
timeNow := time.Now()
|
||||||
|
duration := timeNow.Sub(mj.LastPing)
|
||||||
|
if DEBUG {
|
||||||
|
fmt.Printf("[Worker Status] id: %s | time: %s | active: %t | status: %t\n", mj.WorkerId, duration.String(), mj.Active, mj.Status)
|
||||||
|
}
|
||||||
|
if mj.Active && !mj.Status {
|
||||||
|
if duration >= JOB_TIMEOUT_S {
|
||||||
|
// unset worker id and set the status
|
||||||
|
if DEBUG {
|
||||||
|
fmt.Printf("[Terminating Worker] %s", mj.WorkerId)
|
||||||
|
}
|
||||||
|
c.MapJobs[i].WorkerId = ""
|
||||||
|
c.MapJobs[i].Active = false
|
||||||
|
c.MapJobs[i].Status = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i, rj := range c.ReduceJobs {
|
||||||
|
timeNow := time.Now()
|
||||||
|
if rj.Active && !rj.Status {
|
||||||
|
duration := timeNow.Sub(rj.LastPing)
|
||||||
|
if duration >= JOB_TIMEOUT_S {
|
||||||
|
if DEBUG {
|
||||||
|
fmt.Printf("[Terminating Worker] %s", rj.WorkerId)
|
||||||
|
}
|
||||||
|
c.ReduceJobs[i].WorkerId = ""
|
||||||
|
c.ReduceJobs[i].Active = false
|
||||||
|
c.ReduceJobs[i].Status = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// main/mrcoordinator.go calls Done() periodically to find out
|
// main/mrcoordinator.go calls Done() periodically to find out
|
||||||
// if the entire job has finished.
|
// if the entire job has finished.
|
||||||
//
|
|
||||||
func (c *Coordinator) Done() bool {
|
func (c *Coordinator) Done() bool {
|
||||||
ret := false
|
ret := false
|
||||||
|
go c.CheckTaskValidity()
|
||||||
// Your code here.
|
// Check any non-completed jobs
|
||||||
|
for _, mj := range c.MapJobs {
|
||||||
|
if !mj.Status {
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, mj := range c.ReduceJobs {
|
||||||
|
if !mj.Status {
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// all mj completed
|
||||||
|
ret = true
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// create a Coordinator.
|
// create a Coordinator.
|
||||||
// main/mrcoordinator.go calls this function.
|
// main/mrcoordinator.go calls this function.
|
||||||
// nReduce is the number of reduce tasks to use.
|
// nReduce is the number of reduce tasks to use.
|
||||||
//
|
|
||||||
func MakeCoordinator(files []string, nReduce int) *Coordinator {
|
func MakeCoordinator(files []string, nReduce int) *Coordinator {
|
||||||
c := Coordinator{}
|
c := Coordinator{}
|
||||||
|
|
||||||
// Your code here.
|
c.NReduce = nReduce
|
||||||
|
|
||||||
|
for i := range files {
|
||||||
|
mj := Job{}
|
||||||
|
mj.Filenames = []string{files[i]}
|
||||||
|
mj.Status = false
|
||||||
|
mj.Active = false
|
||||||
|
c.mapWg.Add(1)
|
||||||
|
c.MapJobs = append(c.MapJobs, mj)
|
||||||
|
}
|
||||||
|
|
||||||
|
for j := 0; j < nReduce; j++ {
|
||||||
|
reduceJob := Job{}
|
||||||
|
reduceJob.Active = false
|
||||||
|
reduceJob.Filenames = []string{}
|
||||||
|
reduceJob.Status = false
|
||||||
|
c.reduceWg.Add(1)
|
||||||
|
c.ReduceJobs = append(c.ReduceJobs, reduceJob)
|
||||||
|
}
|
||||||
|
|
||||||
c.server()
|
c.server()
|
||||||
return &c
|
return &c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func genWorkerId() (string, error) {
|
||||||
|
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||||||
|
result := make([]byte, 8)
|
||||||
|
for i := range result {
|
||||||
|
index, err := rand.Int(rand.Reader, big.NewInt(int64(len(charset))))
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("error generating random number: %v", err)
|
||||||
|
}
|
||||||
|
result[i] = charset[index.Int64()]
|
||||||
|
}
|
||||||
|
return string(result), nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,25 +6,32 @@ package mr
|
|||||||
// remember to capitalize all names.
|
// remember to capitalize all names.
|
||||||
//
|
//
|
||||||
|
|
||||||
import "os"
|
import (
|
||||||
import "strconv"
|
"os"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
//
|
//
|
||||||
// example to show how to declare the arguments
|
// example to show how to declare the arguments
|
||||||
// and reply for an RPC.
|
// and reply for an RPC.
|
||||||
//
|
//
|
||||||
|
|
||||||
type ExampleArgs struct {
|
type RpcArgument struct {
|
||||||
X int
|
WorkerId string
|
||||||
|
Method string
|
||||||
|
TaskType string
|
||||||
|
IntermediateFiles []string
|
||||||
}
|
}
|
||||||
|
|
||||||
type ExampleReply struct {
|
type RpcReply struct {
|
||||||
Y int
|
WorkerId string
|
||||||
|
TaskType string
|
||||||
|
Filenames []string
|
||||||
|
NReduce int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add your RPC definitions here.
|
// Add your RPC definitions here.
|
||||||
|
|
||||||
|
|
||||||
// Cook up a unique-ish UNIX-domain socket name
|
// Cook up a unique-ish UNIX-domain socket name
|
||||||
// in /var/tmp, for the coordinator.
|
// in /var/tmp, for the coordinator.
|
||||||
// Can't use the current directory since
|
// Can't use the current directory since
|
||||||
|
|||||||
240
src/mr/worker.go
240
src/mr/worker.go
@@ -1,77 +1,245 @@
|
|||||||
package mr
|
package mr
|
||||||
|
|
||||||
import "fmt"
|
import (
|
||||||
import "log"
|
"bytes"
|
||||||
import "net/rpc"
|
"encoding/json"
|
||||||
import "hash/fnv"
|
"fmt"
|
||||||
|
"hash/fnv"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/rpc"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
//
|
|
||||||
// Map functions return a slice of KeyValue.
|
// Map functions return a slice of KeyValue.
|
||||||
//
|
|
||||||
type KeyValue struct {
|
type KeyValue struct {
|
||||||
Key string
|
Key string
|
||||||
Value string
|
Value string
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
// Sorting
|
||||||
|
// for sorting by key.
|
||||||
|
type ByKey []KeyValue
|
||||||
|
|
||||||
|
// for sorting by key.
|
||||||
|
func (a ByKey) Len() int { return len(a) }
|
||||||
|
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
|
||||||
|
|
||||||
// use ihash(key) % NReduce to choose the reduce
|
// use ihash(key) % NReduce to choose the reduce
|
||||||
// task number for each KeyValue emitted by Map.
|
// task number for each KeyValue emitted by Map.
|
||||||
//
|
|
||||||
func ihash(key string) int {
|
func ihash(key string) int {
|
||||||
h := fnv.New32a()
|
h := fnv.New32a()
|
||||||
h.Write([]byte(key))
|
h.Write([]byte(key))
|
||||||
return int(h.Sum32() & 0x7fffffff)
|
return int(h.Sum32() & 0x7fffffff)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
//
|
|
||||||
// main/mrworker.go calls this function.
|
// main/mrworker.go calls this function.
|
||||||
//
|
|
||||||
func Worker(mapf func(string, string) []KeyValue,
|
func Worker(mapf func(string, string) []KeyValue,
|
||||||
reducef func(string, []string) string) {
|
reducef func(string, []string) string) {
|
||||||
|
|
||||||
// Your worker implementation here.
|
// Your worker implementation here.
|
||||||
|
var workerId, taskType string
|
||||||
|
var inFilenames []string
|
||||||
|
var nReduce int
|
||||||
|
|
||||||
// uncomment to send the Example RPC to the coordinator.
|
taskType = "init"
|
||||||
// CallExample()
|
for taskType != "done" {
|
||||||
|
//for taskType != "done" && taskType != "init" && taskType != "map" && taskType != "reduce" {
|
||||||
|
//workerId, taskType, inFilenames, nReduce = RequestTask()
|
||||||
|
//}
|
||||||
|
for {
|
||||||
|
workerId, taskType, inFilenames, nReduce = RequestTask()
|
||||||
|
|
||||||
|
// Check if taskType is valid
|
||||||
|
if taskType == "done" || taskType == "init" || taskType == "map" || taskType == "reduce" {
|
||||||
|
break // Exit the loop if taskType is valid
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
//fmt.Println("Invalid taskType, retrying...")
|
||||||
|
}
|
||||||
|
|
||||||
|
if taskType == "done" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if taskType == "map" {
|
||||||
|
ifn := inFilenames[0]
|
||||||
|
file, err := os.Open(ifn)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("cannot open %v", ifn)
|
||||||
|
}
|
||||||
|
content, err := io.ReadAll(file)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("cannot read %v", ifn)
|
||||||
|
}
|
||||||
|
file.Close()
|
||||||
|
kva := mapf(ifn, string(content))
|
||||||
|
//hash the map job id
|
||||||
|
mapTaskIdx := ihash(ifn)
|
||||||
|
|
||||||
|
IntermediaryFileMaps := make(map[int][]KeyValue)
|
||||||
|
|
||||||
|
for i := 0; i < nReduce; i++ {
|
||||||
|
IntermediaryFileMaps[i] = []KeyValue{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range kva {
|
||||||
|
//log.Printf("%s: %s", kva[i].Key, kva[i].Value)
|
||||||
|
idx := ihash(kva[i].Key) % nReduce
|
||||||
|
IntermediaryFileMaps[idx] = append(IntermediaryFileMaps[idx], kva[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
intermediateFiles := []string{}
|
||||||
|
for idx := range IntermediaryFileMaps {
|
||||||
|
outFile := fmt.Sprintf("mr-%d-%d", mapTaskIdx, idx)
|
||||||
|
intermediateFiles = append(intermediateFiles, outFile)
|
||||||
|
jsonData, err := json.Marshal(IntermediaryFileMaps[idx])
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("failed to marshal json")
|
||||||
|
}
|
||||||
|
if err = WriteTempAtomic(outFile, jsonData); err != nil {
|
||||||
|
log.Fatalf("failed to write KV map to file")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SetTaskDone(workerId, "map", intermediateFiles)
|
||||||
|
|
||||||
|
} else if taskType == "reduce" {
|
||||||
|
var intermediate []KeyValue
|
||||||
|
|
||||||
|
for _, f := range inFilenames {
|
||||||
|
// Open the file
|
||||||
|
file, err := os.Open(f)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to open file: %v", err)
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
// Read the file contents
|
||||||
|
data, err := io.ReadAll(file)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to read file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the JSON data
|
||||||
|
var intermidateShard []KeyValue
|
||||||
|
if err := json.Unmarshal(data, &intermidateShard); err != nil {
|
||||||
|
log.Fatalf("Failed to parse JSON: %v", err)
|
||||||
|
}
|
||||||
|
intermediate = append(intermediate, intermidateShard...)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Sort(ByKey(intermediate))
|
||||||
|
|
||||||
|
// Prepare output file
|
||||||
|
reduceJobNum := strings.Split(inFilenames[0], "-")[2]
|
||||||
|
oFilename := fmt.Sprintf("mr-out-%s", reduceJobNum)
|
||||||
|
|
||||||
|
var buffer bytes.Buffer
|
||||||
|
i := 0
|
||||||
|
for i < len(intermediate) {
|
||||||
|
j := i + 1
|
||||||
|
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
values := []string{}
|
||||||
|
for k := i; k < j; k++ {
|
||||||
|
values = append(values, intermediate[k].Value)
|
||||||
|
}
|
||||||
|
output := reducef(intermediate[i].Key, values)
|
||||||
|
|
||||||
|
// this is the correct format for each line of Reduce output.
|
||||||
|
fmt.Fprintf(&buffer, "%v %v\n", intermediate[i].Key, output)
|
||||||
|
//fmt.Println(buffer.String())
|
||||||
|
i = j
|
||||||
|
}
|
||||||
|
if err := WriteTempAtomic(oFilename, buffer.Bytes()); err != nil {
|
||||||
|
log.Fatalf("failed to write KV map to file: %w", err)
|
||||||
|
}
|
||||||
|
SetTaskDone(workerId, "reduce", []string{})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
func WriteTempAtomic(fn string, data []byte) error {
|
||||||
// example function to show how to make an RPC call to the coordinator.
|
// Create a temporary file in the target directory
|
||||||
//
|
currentDir, err := os.Getwd()
|
||||||
// the RPC argument and reply types are defined in rpc.go.
|
tempFile, err := os.CreateTemp(currentDir, fn)
|
||||||
//
|
if err != nil {
|
||||||
func CallExample() {
|
return fmt.Errorf("failed to create temp file: %w", err)
|
||||||
|
}
|
||||||
|
tempPath := tempFile.Name()
|
||||||
|
|
||||||
// declare an argument structure.
|
if _, err := tempFile.Write(data); err != nil {
|
||||||
args := ExampleArgs{}
|
tempFile.Close()
|
||||||
|
os.Remove(tempPath)
|
||||||
|
return fmt.Errorf("failed to write to temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// fill in the argument(s).
|
// Ensure the file is fully written to disk
|
||||||
args.X = 99
|
if err := tempFile.Sync(); err != nil {
|
||||||
|
tempFile.Close()
|
||||||
|
os.Remove(tempPath)
|
||||||
|
return fmt.Errorf("failed to sync temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// declare a reply structure.
|
if err := tempFile.Close(); err != nil {
|
||||||
reply := ExampleReply{}
|
os.Remove(tempPath)
|
||||||
|
return fmt.Errorf("failed to close temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// send the RPC request, wait for the reply.
|
// Compute the target file path
|
||||||
// the "Coordinator.Example" tells the
|
targetPath := filepath.Join(currentDir, fn)
|
||||||
// receiving server that we'd like to call
|
|
||||||
// the Example() method of struct Coordinator.
|
// Atomically replace the target file with the temporary file
|
||||||
ok := call("Coordinator.Example", &args, &reply)
|
if err := os.Rename(tempPath, targetPath); err != nil {
|
||||||
|
os.Remove(tempPath)
|
||||||
|
return fmt.Errorf("failed to rename temp file to target: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RPC call to set the task status to done
|
||||||
|
func SetTaskDone(workerId string, taskType string, intermediateFiles []string) {
|
||||||
|
args := RpcArgument{}
|
||||||
|
args.WorkerId = workerId
|
||||||
|
args.Method = "set_task_done"
|
||||||
|
args.TaskType = taskType
|
||||||
|
args.IntermediateFiles = intermediateFiles
|
||||||
|
reply := RpcReply{}
|
||||||
|
ok := call("Coordinator.SetTaskDone", &args, &reply)
|
||||||
|
if !ok {
|
||||||
|
fmt.Printf("call failed!\n")
|
||||||
|
}
|
||||||
|
/*
|
||||||
if ok {
|
if ok {
|
||||||
// reply.Y should be 100.
|
fmt.Printf("ok\n")
|
||||||
fmt.Printf("reply.Y %v\n", reply.Y)
|
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("call failed!\n")
|
fmt.Printf("call failed!\n")
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
// RPC call to request task from coordinator
|
||||||
|
func RequestTask() (string, string, []string, int) {
|
||||||
|
// declare an arg and reply
|
||||||
|
args := RpcArgument{}
|
||||||
|
args.Method = "request_task"
|
||||||
|
reply := RpcReply{}
|
||||||
|
|
||||||
|
ok := call("Coordinator.GetTasks", &args, &reply)
|
||||||
|
if !ok {
|
||||||
|
fmt.Printf("call failed!\n")
|
||||||
|
}
|
||||||
|
return reply.WorkerId, reply.TaskType, reply.Filenames, reply.NReduce
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// send an RPC request to the coordinator, wait for the response.
|
// send an RPC request to the coordinator, wait for the response.
|
||||||
// usually returns true.
|
// usually returns true.
|
||||||
// returns false if something goes wrong.
|
// returns false if something goes wrong.
|
||||||
//
|
|
||||||
func call(rpcname string, args interface{}, reply interface{}) bool {
|
func call(rpcname string, args interface{}, reply interface{}) bool {
|
||||||
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
|
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
|
||||||
sockname := coordinatorSock()
|
sockname := coordinatorSock()
|
||||||
|
|||||||
@@ -551,7 +551,7 @@ func (cfg *config) wait(index int, n int, startTerm int) interface{} {
|
|||||||
// if retry==true, may submit the command multiple
|
// if retry==true, may submit the command multiple
|
||||||
// times, in case a leader fails just after Start().
|
// times, in case a leader fails just after Start().
|
||||||
// if retry==false, calls Start() only once, in order
|
// if retry==false, calls Start() only once, in order
|
||||||
// to simplify the early Lab 2B tests.
|
// to simplify the early Lab 3B tests.
|
||||||
func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
|
func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
|
||||||
t0 := time.Now()
|
t0 := time.Now()
|
||||||
starts := 0
|
starts := 0
|
||||||
@@ -605,7 +605,7 @@ func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
|
|||||||
|
|
||||||
// start a Test.
|
// start a Test.
|
||||||
// print the Test message.
|
// print the Test message.
|
||||||
// e.g. cfg.begin("Test (2B): RPC counts aren't too high")
|
// e.g. cfg.begin("Test (3B): RPC counts aren't too high")
|
||||||
func (cfg *config) begin(description string) {
|
func (cfg *config) begin(description string) {
|
||||||
fmt.Printf("%s ...\n", description)
|
fmt.Printf("%s ...\n", description)
|
||||||
cfg.t0 = time.Now()
|
cfg.t0 = time.Now()
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ import (
|
|||||||
// CommandValid to true to indicate that the ApplyMsg contains a newly
|
// CommandValid to true to indicate that the ApplyMsg contains a newly
|
||||||
// committed log entry.
|
// committed log entry.
|
||||||
//
|
//
|
||||||
// in part 2D you'll want to send other kinds of messages (e.g.,
|
// in part 3D you'll want to send other kinds of messages (e.g.,
|
||||||
// snapshots) on the applyCh, but set CommandValid to false for these
|
// snapshots) on the applyCh, but set CommandValid to false for these
|
||||||
// other uses.
|
// other uses.
|
||||||
type ApplyMsg struct {
|
type ApplyMsg struct {
|
||||||
@@ -43,7 +43,7 @@ type ApplyMsg struct {
|
|||||||
Command interface{}
|
Command interface{}
|
||||||
CommandIndex int
|
CommandIndex int
|
||||||
|
|
||||||
// For 2D:
|
// For 3D:
|
||||||
SnapshotValid bool
|
SnapshotValid bool
|
||||||
Snapshot []byte
|
Snapshot []byte
|
||||||
SnapshotTerm int
|
SnapshotTerm int
|
||||||
@@ -58,7 +58,7 @@ type Raft struct {
|
|||||||
me int // this peer's index into peers[]
|
me int // this peer's index into peers[]
|
||||||
dead int32 // set by Kill()
|
dead int32 // set by Kill()
|
||||||
|
|
||||||
// Your data here (2A, 2B, 2C).
|
// Your data here (3A, 3B, 3C).
|
||||||
// Look at the paper's Figure 2 for a description of what
|
// Look at the paper's Figure 2 for a description of what
|
||||||
// state a Raft server must maintain.
|
// state a Raft server must maintain.
|
||||||
|
|
||||||
@@ -70,7 +70,7 @@ func (rf *Raft) GetState() (int, bool) {
|
|||||||
|
|
||||||
var term int
|
var term int
|
||||||
var isleader bool
|
var isleader bool
|
||||||
// Your code here (2A).
|
// Your code here (3A).
|
||||||
return term, isleader
|
return term, isleader
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -82,7 +82,7 @@ func (rf *Raft) GetState() (int, bool) {
|
|||||||
// after you've implemented snapshots, pass the current snapshot
|
// after you've implemented snapshots, pass the current snapshot
|
||||||
// (or nil if there's not yet a snapshot).
|
// (or nil if there's not yet a snapshot).
|
||||||
func (rf *Raft) persist() {
|
func (rf *Raft) persist() {
|
||||||
// Your code here (2C).
|
// Your code here (3C).
|
||||||
// Example:
|
// Example:
|
||||||
// w := new(bytes.Buffer)
|
// w := new(bytes.Buffer)
|
||||||
// e := labgob.NewEncoder(w)
|
// e := labgob.NewEncoder(w)
|
||||||
@@ -98,7 +98,7 @@ func (rf *Raft) readPersist(data []byte) {
|
|||||||
if data == nil || len(data) < 1 { // bootstrap without any state?
|
if data == nil || len(data) < 1 { // bootstrap without any state?
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Your code here (2C).
|
// Your code here (3C).
|
||||||
// Example:
|
// Example:
|
||||||
// r := bytes.NewBuffer(data)
|
// r := bytes.NewBuffer(data)
|
||||||
// d := labgob.NewDecoder(r)
|
// d := labgob.NewDecoder(r)
|
||||||
@@ -119,7 +119,7 @@ func (rf *Raft) readPersist(data []byte) {
|
|||||||
// service no longer needs the log through (and including)
|
// service no longer needs the log through (and including)
|
||||||
// that index. Raft should now trim its log as much as possible.
|
// that index. Raft should now trim its log as much as possible.
|
||||||
func (rf *Raft) Snapshot(index int, snapshot []byte) {
|
func (rf *Raft) Snapshot(index int, snapshot []byte) {
|
||||||
// Your code here (2D).
|
// Your code here (3D).
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,18 +127,18 @@ func (rf *Raft) Snapshot(index int, snapshot []byte) {
|
|||||||
// example RequestVote RPC arguments structure.
|
// example RequestVote RPC arguments structure.
|
||||||
// field names must start with capital letters!
|
// field names must start with capital letters!
|
||||||
type RequestVoteArgs struct {
|
type RequestVoteArgs struct {
|
||||||
// Your data here (2A, 2B).
|
// Your data here (3A, 3B).
|
||||||
}
|
}
|
||||||
|
|
||||||
// example RequestVote RPC reply structure.
|
// example RequestVote RPC reply structure.
|
||||||
// field names must start with capital letters!
|
// field names must start with capital letters!
|
||||||
type RequestVoteReply struct {
|
type RequestVoteReply struct {
|
||||||
// Your data here (2A).
|
// Your data here (3A).
|
||||||
}
|
}
|
||||||
|
|
||||||
// example RequestVote RPC handler.
|
// example RequestVote RPC handler.
|
||||||
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
|
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
|
||||||
// Your code here (2A, 2B).
|
// Your code here (3A, 3B).
|
||||||
}
|
}
|
||||||
|
|
||||||
// example code to send a RequestVote RPC to a server.
|
// example code to send a RequestVote RPC to a server.
|
||||||
@@ -191,7 +191,7 @@ func (rf *Raft) Start(command interface{}) (int, int, bool) {
|
|||||||
term := -1
|
term := -1
|
||||||
isLeader := true
|
isLeader := true
|
||||||
|
|
||||||
// Your code here (2B).
|
// Your code here (3B).
|
||||||
|
|
||||||
|
|
||||||
return index, term, isLeader
|
return index, term, isLeader
|
||||||
@@ -219,7 +219,7 @@ func (rf *Raft) killed() bool {
|
|||||||
func (rf *Raft) ticker() {
|
func (rf *Raft) ticker() {
|
||||||
for rf.killed() == false {
|
for rf.killed() == false {
|
||||||
|
|
||||||
// Your code here (2A)
|
// Your code here (3A)
|
||||||
// Check if a leader election should be started.
|
// Check if a leader election should be started.
|
||||||
|
|
||||||
|
|
||||||
@@ -246,7 +246,7 @@ func Make(peers []*labrpc.ClientEnd, me int,
|
|||||||
rf.persister = persister
|
rf.persister = persister
|
||||||
rf.me = me
|
rf.me = me
|
||||||
|
|
||||||
// Your initialization code here (2A, 2B, 2C).
|
// Your initialization code here (3A, 3B, 3C).
|
||||||
|
|
||||||
// initialize from state persisted before a crash
|
// initialize from state persisted before a crash
|
||||||
rf.readPersist(persister.ReadRaftState())
|
rf.readPersist(persister.ReadRaftState())
|
||||||
|
|||||||
Reference in New Issue
Block a user