Compare commits

...

10 Commits

Author SHA1 Message Date
dffbcad936 cp before pse 2025-01-06 18:51:37 +07:00
63706b6685 cp1 2025-01-01 18:25:05 +07:00
Upamanyu Sharma
db55e49c10 update 2024-04-02 14:57:06 -04:00
kenctrl
ab9b8d2f68 update lab4 2024-03-05 10:48:41 -05:00
Ananya Jain
f4310a86eb Update Clerk comment to reflect new Put/Append RPCs 2024-03-05 09:23:25 -05:00
kenctrl
52bf53742e separate Put and Append in lab 4 kvsrv 2024-03-05 01:21:16 -05:00
kenctrl
b6635d50e5 update PutAppend spec 2024-03-03 16:31:19 -05:00
kenctrl
7b670fdd48 update PutAppend spec 2024-03-03 16:30:37 -05:00
Yun-Sheng Chang
d6f2a7fb5c Fix comments of lab 3 2024-02-18 20:39:24 -05:00
Frans Kaashoek
5b5ae16e92 update 2024-02-12 08:09:47 -05:00
10 changed files with 204 additions and 93 deletions

View File

@@ -110,11 +110,11 @@ check_lab5b() {
check_cmd go test -c check_cmd go test -c
# also check other labs/parts # also check other labs/parts
cd "$tmpdir" cd "$tmpdir"
check_lab4a check_lab5a
cd "$tmpdir"
check_lab4
cd "$tmpdir" cd "$tmpdir"
check_lab3 check_lab3
cd "$tmpdir"
check_lab2
} }
check_cmd() { check_cmd() {

View File

@@ -29,7 +29,7 @@ func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
// keeps trying forever in the face of all other errors. // keeps trying forever in the face of all other errors.
// //
// you can send an RPC with code like this: // you can send an RPC with code like this:
// ok := ck.servers[i].Call("KVServer.Get", &args, &reply) // ok := ck.servers[i].Call("KVServer."+op, &args, &reply)
// //
// the types of args and reply (including whether they are pointers) // the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's // must match the declared types of the RPC handler function's

View File

@@ -12,7 +12,6 @@ type Err string
type PutAppendArgs struct { type PutAppendArgs struct {
Key string Key string
Value string Value string
Op string // "Put" or "Append"
// You'll have to add definitions here. // You'll have to add definitions here.
// Field names must start with capital letters, // Field names must start with capital letters,
// otherwise RPC will break. // otherwise RPC will break.

View File

@@ -42,7 +42,11 @@ func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here. // Your code here.
} }
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
}
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here. // Your code here.
} }

View File

@@ -1,9 +1,13 @@
package kvsrv package kvsrv
import "6.5840/labrpc" import (
import "crypto/rand" "crypto/rand"
import "math/big" "log"
"math/big"
"6.5840/labrpc"
"github.com/google/uuid"
)
type Clerk struct { type Clerk struct {
server *labrpc.ClientEnd server *labrpc.ClientEnd
@@ -35,9 +39,21 @@ func MakeClerk(server *labrpc.ClientEnd) *Clerk {
// must match the declared types of the RPC handler function's // must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer. // arguments. and reply must be passed as a pointer.
func (ck *Clerk) Get(key string) string { func (ck *Clerk) Get(key string) string {
id := uuid.New()
args := GetArgs{
Key: key,
Uuid: id,
}
reply := GetReply{}
ok := false
for !ok {
ok = ck.server.Call("KVServer.Get", &args, &reply)
if !ok {
log.Println("RPC call failed: GET op")
}
}
// You will have to modify this function. // You will have to modify this function.
return "" return reply.Value
} }
// shared by Put and Append. // shared by Put and Append.
@@ -50,7 +66,22 @@ func (ck *Clerk) Get(key string) string {
// arguments. and reply must be passed as a pointer. // arguments. and reply must be passed as a pointer.
func (ck *Clerk) PutAppend(key string, value string, op string) string { func (ck *Clerk) PutAppend(key string, value string, op string) string {
// You will have to modify this function. // You will have to modify this function.
return "" id := uuid.New()
args := PutAppendArgs{
Key: key,
Value: value,
Uuid: id,
}
reply := PutAppendReply{}
ok := false
for !ok {
ok := ck.server.Call("KVServer."+op, &args, &reply)
if !ok {
log.Printf("RPC call failed: %s op\n", op)
}
}
return reply.Value
} }
func (ck *Clerk) Put(key string, value string) { func (ck *Clerk) Put(key string, value string) {

View File

@@ -7,6 +7,7 @@ type PutAppendArgs struct {
// You'll have to add definitions here. // You'll have to add definitions here.
// Field names must start with capital letters, // Field names must start with capital letters,
// otherwise RPC will break. // otherwise RPC will break.
Uuid string
} }
type PutAppendReply struct { type PutAppendReply struct {
@@ -16,6 +17,7 @@ type PutAppendReply struct {
type GetArgs struct { type GetArgs struct {
Key string Key string
// You'll have to add definitions here. // You'll have to add definitions here.
Uuid string
} }
type GetReply struct { type GetReply struct {

View File

@@ -5,7 +5,7 @@ import (
"sync" "sync"
) )
const Debug = false const Debug = true
func DPrintf(format string, a ...interface{}) (n int, err error) { func DPrintf(format string, a ...interface{}) (n int, err error) {
if Debug { if Debug {
@@ -14,30 +14,47 @@ func DPrintf(format string, a ...interface{}) (n int, err error) {
return return
} }
type KVServer struct { type KVServer struct {
mu sync.Mutex mu sync.Mutex
KVStore map[string]string
OpLog map[string]bool
// Your definitions here. // Your definitions here.
} }
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here. // Your code here.
kv.mu.Lock()
defer kv.mu.Unlock()
reply.Value = kv.KVStore[args.Key]
} }
func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) { func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here. // Your code here.
kv.mu.Lock()
defer kv.mu.Unlock()
kv.KVStore[args.Key] = args.Value
reply.Value = args.Value
} }
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) { func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here. // Your code here.
kv.mu.Lock()
defer kv.mu.Unlock()
val, exists := kv.KVStore[args.Key]
if !exists {
kv.KVStore[args.Key] = args.Value
reply.Value = ""
} else {
kv.KVStore[args.Key] = val + args.Value
reply.Value = val
}
} }
func StartKVServer() *KVServer { func StartKVServer() *KVServer {
kv := new(KVServer) kv := new(KVServer)
kv.KVStore = map[string]string{}
// You may need initialization code here.
return kv return kv
} }

View File

@@ -373,7 +373,7 @@ const (
) )
func TestMemGet2(t *testing.T) { func TestMemGet2(t *testing.T) {
const MEM = 100 // in MiB const MEM = 10 // in MiB
cfg := make_config(t, true) cfg := make_config(t, true)
defer cfg.cleanup() defer cfg.cleanup()
@@ -408,7 +408,7 @@ func TestMemGet2(t *testing.T) {
} }
func TestMemPut2(t *testing.T) { func TestMemPut2(t *testing.T) {
const MEM = 100 // in MiB const MEM = 10 // in MiB
cfg := make_config(t, false) cfg := make_config(t, false)
defer cfg.cleanup() defer cfg.cleanup()
@@ -423,6 +423,7 @@ func TestMemPut2(t *testing.T) {
ck1.Put("k", "") ck1.Put("k", "")
runtime.GC() runtime.GC()
var st runtime.MemStats var st runtime.MemStats
runtime.ReadMemStats(&st) runtime.ReadMemStats(&st)
m := st.HeapAlloc / MiB m := st.HeapAlloc / MiB
@@ -433,7 +434,7 @@ func TestMemPut2(t *testing.T) {
} }
func TestMemAppend2(t *testing.T) { func TestMemAppend2(t *testing.T) {
const MEM = 100 // in MiB const MEM = 10 // in MiB
cfg := make_config(t, false) cfg := make_config(t, false)
defer cfg.cleanup() defer cfg.cleanup()
@@ -458,63 +459,26 @@ func TestMemAppend2(t *testing.T) {
cfg.end() cfg.end()
} }
func TestMemPutMany2(t *testing.T) { func TestMemPutManyClients(t *testing.T) {
const ( const (
NPUT = 1_000_000 NCLIENT = 100_000
MEM = 1000 MEM = 1000
) )
cfg := make_config(t, false) cfg := make_config(t, false)
defer cfg.cleanup() defer cfg.cleanup()
cfg.begin("Test: memory use many puts")
ck := cfg.makeClient()
v := randValue(MEM) v := randValue(MEM)
ck.Put("k", v) cks := make([]*Clerk, NCLIENT)
for i, _ := range cks {
// allow threads started by labrpc to exit cks[i] = cfg.makeClient()
time.Sleep(1 * time.Second)
runtime.GC()
var st runtime.MemStats
runtime.ReadMemStats(&st)
m0 := st.HeapAlloc
for i := 0; i < NPUT; i++ {
ck.Put("k", v)
} }
// allow threads started by labrpc to exit // allow threads started by labrpc to start
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
runtime.GC() cfg.begin("Test: memory use many put clients")
runtime.ReadMemStats(&st)
m1 := st.HeapAlloc
//log.Printf("mem m0 %d m1 %d\n", m0, m1)
if m1 > m0+NPUT/10 {
t.Fatalf("error: server using too much memory %d %d\n", m0, m1)
}
cfg.end()
}
func TestMemGetMany2(t *testing.T) {
const (
NCLIENT = 100_000
)
cfg := make_config(t, false)
defer cfg.cleanup()
cfg.begin("Test: memory use many gets")
ck := cfg.makeClient()
ck.Put("0", "")
cfg.deleteClient(ck)
runtime.GC() runtime.GC()
runtime.GC() runtime.GC()
@@ -524,25 +488,119 @@ func TestMemGetMany2(t *testing.T) {
m0 := st.HeapAlloc m0 := st.HeapAlloc
for i := 0; i < NCLIENT; i++ { for i := 0; i < NCLIENT; i++ {
ck := cfg.makeClient() cks[i].Put("k", v)
ck.Get("0")
cfg.deleteClient(ck)
} }
runtime.GC()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
runtime.GC() runtime.GC()
runtime.GC()
//runtime.GC()
runtime.ReadMemStats(&st) runtime.ReadMemStats(&st)
m1 := st.HeapAlloc m1 := st.HeapAlloc
f := (float64(m1) - float64(m0)) / NCLIENT
if m1 > m0+(NCLIENT*200) {
t.Fatalf("error: server using too much memory %d %d (%.2f per client)\n", m0, m1, f)
}
log.Printf("mem m0 %d m1 %d\n", m0, m1) for _, ck := range cks {
cfg.deleteClient(ck)
if m1 >= m0+NCLIENT {
t.Fatalf("error: server using too much memory m0 %d m1 %d\n", m0, m1)
} }
cfg.end() cfg.end()
} }
func TestMemGetManyClients(t *testing.T) {
const (
NCLIENT = 100_000
)
cfg := make_config(t, false)
defer cfg.cleanup()
cfg.begin("Test: memory use many get client")
ck := cfg.makeClient()
ck.Put("0", "")
cfg.deleteClient(ck)
cks := make([]*Clerk, NCLIENT)
for i, _ := range cks {
cks[i] = cfg.makeClient()
}
// allow threads started by labrpc to start
time.Sleep(1 * time.Second)
runtime.GC()
runtime.GC()
var st runtime.MemStats
runtime.ReadMemStats(&st)
m0 := st.HeapAlloc
for i := 0; i < NCLIENT; i++ {
cks[i].Get("0")
}
runtime.GC()
time.Sleep(1 * time.Second)
runtime.GC()
runtime.ReadMemStats(&st)
m1 := st.HeapAlloc
f := (float64(m1) - float64(m0)) / NCLIENT
if m1 >= m0+NCLIENT*10 {
t.Fatalf("error: server using too much memory m0 %d m1 %d (%.2f per client)\n", m0, m1, f)
}
for _, ck := range cks {
cfg.deleteClient(ck)
}
cfg.end()
}
func TestMemManyAppends(t *testing.T) {
const (
N = 1000
MEM = 1000
)
cfg := make_config(t, false)
defer cfg.cleanup()
cfg.begin("Test: memory use many appends")
ck := cfg.makeClient()
rdVal := randValue(MEM)
runtime.GC()
runtime.GC()
var st runtime.MemStats
runtime.ReadMemStats(&st)
m0 := st.HeapAlloc
for i := 0; i < N; i++ {
ck.Append("k", rdVal)
}
runtime.GC()
time.Sleep(1 * time.Second)
runtime.GC()
runtime.ReadMemStats(&st)
m1 := st.HeapAlloc
if m1 >= 3*MEM*N {
t.Fatalf("error: server using too much memory m0 %d m1 %d\n", m0, m1)
}
log.Printf("m0 %d m1 %d\n", m0, m1)
cfg.deleteClient(ck)
cfg.end()
}

View File

@@ -551,7 +551,7 @@ func (cfg *config) wait(index int, n int, startTerm int) interface{} {
// if retry==true, may submit the command multiple // if retry==true, may submit the command multiple
// times, in case a leader fails just after Start(). // times, in case a leader fails just after Start().
// if retry==false, calls Start() only once, in order // if retry==false, calls Start() only once, in order
// to simplify the early Lab 2B tests. // to simplify the early Lab 3B tests.
func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int { func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
t0 := time.Now() t0 := time.Now()
starts := 0 starts := 0
@@ -605,7 +605,7 @@ func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
// start a Test. // start a Test.
// print the Test message. // print the Test message.
// e.g. cfg.begin("Test (2B): RPC counts aren't too high") // e.g. cfg.begin("Test (3B): RPC counts aren't too high")
func (cfg *config) begin(description string) { func (cfg *config) begin(description string) {
fmt.Printf("%s ...\n", description) fmt.Printf("%s ...\n", description)
cfg.t0 = time.Now() cfg.t0 = time.Now()

View File

@@ -35,7 +35,7 @@ import (
// CommandValid to true to indicate that the ApplyMsg contains a newly // CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry. // committed log entry.
// //
// in part 2D you'll want to send other kinds of messages (e.g., // in part 3D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these // snapshots) on the applyCh, but set CommandValid to false for these
// other uses. // other uses.
type ApplyMsg struct { type ApplyMsg struct {
@@ -43,7 +43,7 @@ type ApplyMsg struct {
Command interface{} Command interface{}
CommandIndex int CommandIndex int
// For 2D: // For 3D:
SnapshotValid bool SnapshotValid bool
Snapshot []byte Snapshot []byte
SnapshotTerm int SnapshotTerm int
@@ -58,7 +58,7 @@ type Raft struct {
me int // this peer's index into peers[] me int // this peer's index into peers[]
dead int32 // set by Kill() dead int32 // set by Kill()
// Your data here (2A, 2B, 2C). // Your data here (3A, 3B, 3C).
// Look at the paper's Figure 2 for a description of what // Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain. // state a Raft server must maintain.
@@ -70,7 +70,7 @@ func (rf *Raft) GetState() (int, bool) {
var term int var term int
var isleader bool var isleader bool
// Your code here (2A). // Your code here (3A).
return term, isleader return term, isleader
} }
@@ -82,7 +82,7 @@ func (rf *Raft) GetState() (int, bool) {
// after you've implemented snapshots, pass the current snapshot // after you've implemented snapshots, pass the current snapshot
// (or nil if there's not yet a snapshot). // (or nil if there's not yet a snapshot).
func (rf *Raft) persist() { func (rf *Raft) persist() {
// Your code here (2C). // Your code here (3C).
// Example: // Example:
// w := new(bytes.Buffer) // w := new(bytes.Buffer)
// e := labgob.NewEncoder(w) // e := labgob.NewEncoder(w)
@@ -98,7 +98,7 @@ func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state? if data == nil || len(data) < 1 { // bootstrap without any state?
return return
} }
// Your code here (2C). // Your code here (3C).
// Example: // Example:
// r := bytes.NewBuffer(data) // r := bytes.NewBuffer(data)
// d := labgob.NewDecoder(r) // d := labgob.NewDecoder(r)
@@ -119,7 +119,7 @@ func (rf *Raft) readPersist(data []byte) {
// service no longer needs the log through (and including) // service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible. // that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) { func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D). // Your code here (3D).
} }
@@ -127,18 +127,18 @@ func (rf *Raft) Snapshot(index int, snapshot []byte) {
// example RequestVote RPC arguments structure. // example RequestVote RPC arguments structure.
// field names must start with capital letters! // field names must start with capital letters!
type RequestVoteArgs struct { type RequestVoteArgs struct {
// Your data here (2A, 2B). // Your data here (3A, 3B).
} }
// example RequestVote RPC reply structure. // example RequestVote RPC reply structure.
// field names must start with capital letters! // field names must start with capital letters!
type RequestVoteReply struct { type RequestVoteReply struct {
// Your data here (2A). // Your data here (3A).
} }
// example RequestVote RPC handler. // example RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B). // Your code here (3A, 3B).
} }
// example code to send a RequestVote RPC to a server. // example code to send a RequestVote RPC to a server.
@@ -191,7 +191,7 @@ func (rf *Raft) Start(command interface{}) (int, int, bool) {
term := -1 term := -1
isLeader := true isLeader := true
// Your code here (2B). // Your code here (3B).
return index, term, isLeader return index, term, isLeader
@@ -219,7 +219,7 @@ func (rf *Raft) killed() bool {
func (rf *Raft) ticker() { func (rf *Raft) ticker() {
for rf.killed() == false { for rf.killed() == false {
// Your code here (2A) // Your code here (3A)
// Check if a leader election should be started. // Check if a leader election should be started.
@@ -246,7 +246,7 @@ func Make(peers []*labrpc.ClientEnd, me int,
rf.persister = persister rf.persister = persister
rf.me = me rf.me = me
// Your initialization code here (2A, 2B, 2C). // Your initialization code here (3A, 3B, 3C).
// initialize from state persisted before a crash // initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState()) rf.readPersist(persister.ReadRaftState())