This post is about part C of Lab2 of MIT 6.824, Distributed Systems. For previous two parts, please refer to Part A and B. In this one we are focusing on Persist in Raft. The implementation would mostly follow the figure 2 in the paper as well.
Persist
It’s inevitable to have some machines crashed or rebooted in the clusters. We need to persist some important state so that nodes can begin at the place it crashed and rejoin the cluster again.
As Persistent state on all servers mentioned in figure 2, there three state to persist:
-
currentTerm
: This will avoid the node to vote acandidate
with a smallerterm
-
votedFor
: this prevent a node from voting multiple times in an election -
log[]
: if the node has the majority of committed logs, we need to ensure the futureleader
can see them
Implementation in the Lab
Following the comments in function persist()
and readPersist()
of raft.go, we can fill the functions.
//
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
//
// Call it when lock acquried
func (rf *Raft) persist() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
data := w.Bytes()
rf.persister.SaveRaftState(data)
}
//
// restore previously persisted state
//
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
// Your code here (2C).
// Example:
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currTerm int
var votedFor int
var log []LogEntry
if d.Decode(&currTerm) != nil ||
d.Decode(&votedFor) != nil ||
d.Decode(&log) != nil {
panic("Miss states for node recovery")
} else {
rf.currTerm = currTerm
rf.votedFor = votedFor
rf.log = log
DPrintf("[%v] reboot, Term %v", rf.me, rf.currTerm)
}
}
persist()
needs to be called once one of the three variables listed above gets updated. And readPersist()
should be called when Make()
gets called.
Optimization
Some tests still get failed sometimes, below are the optimization on top of implementation following figure 2 to omit the failures.
Log disagreement
Sometimes the test would fail because taking too long to reach an agreement. You can follow the instructions in the An aside on optimizations in the post Students' Guide to Raft mentioned in the handout. Add two extra variables in AppendEntriesRPC
reply: ConflictTerm
and ConflictIndex
to speed up decrement in stead of decreasing one term per round.
So at leader
side we have:
// Check if the leader has a log at the ConflitTerm
// and get the last entry in the term
lastIdx := rf.searchLastEntryInTerm(reply.ConflictTerm)
if firstIndex != -1 {
// Assume the follower have the all the logs in that term
// and some bad logs in term between ConflictTerm and leader's term
// have such a term, try to append from the next term
rf.nextIndex[i] = firstIndex + 1
} else {
// That term doesn't exist, so try to check if can agree on the log at ConflictIndex
rf.nextIndex[i] = reply.ConflictIndex
}
And at the follower
side:
if args.PreLogIndex > lastIdx {
// The expected PreLogIndex > actual last index on follower
// Ask for decrement
reply.IsSuccess = false
reply.ConflictIndex = len(rf.log)
reply.ConflictTerm = -1
} else if rf.log[args.PreLogIndex].Term != args.PreLogTerm {
// Unable to reach agreement at the PreLogIndex. Ask for decrement
reply.IsSuccess = false
reply.ConflictTerm = rf.log[args.PreLogIndex].Term
reply.ConflictIndex = rf.searchFirstEntryInTerm(args.PreLogIndex)
} else {
// Able to reach agreement at the PreLogIndex
// ...
}
commitIndex
Persist
An other failure I sometimes met in the test TestFigure82C
is the apply error:
Test (2C): Figure 8 ...
2021/01/06 23:22:04 apply error: commit index=5 server=4 3879934624216030722 != server=3 6543482455276445645
exit status 1
As I checked the log, I see the situation like what happened in figure 8 in paper. Given 3 nodes, s1, s2 and s3:
-
s1, s2 and s3 initialized. s1 becomes leader of term 1;
-
s1 receives 3 new commands{1, 2, 3}, sending to s2 and s3. And then these 3 logs are committed successfully;
-
s2 disconnects. it begins to increase the term number and request for votes. say it has reached term 3;
-
s1 crashes and s2 reconnects. Now s2 has a higher term and identical logs. s2 becomes the new leader of term 4;
-
s2 receives 1 new command{4}. Before it sends to s3, it disconnects;
-
s1 now reboots but r2 disconnects. s1 may wins the election again as it has the identical logs. Not s1 and s3 comes to term 5.
-
s1 receive 1 new command{5}. the command successfully got applied to s1 and s3 and got committed;
-
s1 crashes, s2 reconnects and s3 restarts. s2 may wins the election if it jumps to the term 5 and election timer times out before s1’s timer. Now s2 and s3 comes to term 5.
-
Since s3 restarts, it’s
commitIndex
gets reset to 0. So s2 will try to let s1 overwrite command{5} with command{4}. As now s1’scommitIndex
is 0. It will commit the log at the same index again.
To solve this problem, when calling persist()
, add rf.commitIndex
as well. So the node is aware of how many logs have been committed after reboot. And it will reject AppendEntriesRPC
request where args.PreLogIndex < rf.commitIndex
, thinking it the same as the request with a smaller term number.
Unreliable Network
Besides, I sometimes have the failure where index out of range when trying to send AppendEntriesRPC
and use rf.nextIndex[i]
to slice the logs.
Test (2C): Figure 8 (unreliable) ...
panic: runtime error: slice bounds out of range [303:115]
goroutine 17137 [running]:
~/6.824/src/raft.(*Raft).broadcastHeartbeat(0xc0001ce9a0, 0x59)
~/6.824/src/raft/raft_appendEntry.go:223 +0x617
created by ~/6.824/src/raft.(*Raft).setStateLeader
~/6.824/src/raft/raft.go:208 +0x156
exit status 2
And this error would only happen when having an unreliable network. When checking the log, I see it’s because rf.nextIndex[i]
get increased multiple times while trying to send AppendEntriesRPC
to follower
. After review the code, I see:
if reply.IsSuccess {
rf.nextIndex[i] += len(args.Entries)
}
This is the reason why this error showed up. The replies of multiple heartbeat request may comes together and all of them works. So the nextIndex
got updated multiple times.
The easiest fix is using the values in the request directly to set instead of addition.
// In case of unreliable network, running multiple times
rf.nextIndex[i] = args.PrevLogIndex + len(args.Entries) + 1