This post is about part C of Lab2 of MIT 6.824, Distributed Systems. For previous two parts, please refer to Part A and B. In this one we are focusing on Persist in Raft. The implementation would mostly follow the figure 2 in the paper as well.

Persist

It’s inevitable to have some machines crashed or rebooted in the clusters. We need to persist some important state so that nodes can begin at the place it crashed and rejoin the cluster again.

As Persistent state on all servers mentioned in figure 2, there three state to persist:

  • currentTerm: This will avoid the node to vote a candidate with a smaller term

  • votedFor: this prevent a node from voting multiple times in an election

  • log[]: if the node has the majority of committed logs, we need to ensure the future leader can see them

Implementation in the Lab

Following the comments in function persist() and readPersist() of raft.go, we can fill the functions.

//
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
//
// Call it when lock acquried
func (rf *Raft) persist() {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	
    e.Encode(rf.currTerm)
	e.Encode(rf.votedFor)
	e.Encode(rf.log)
    
	data := w.Bytes()
	rf.persister.SaveRaftState(data)
}

//
// restore previously persisted state
//
func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	// Your code here (2C).
	// Example:
	r := bytes.NewBuffer(data)
	d := labgob.NewDecoder(r)
	var currTerm int
	var votedFor int
	var log []LogEntry
	if d.Decode(&currTerm) != nil ||
		d.Decode(&votedFor) != nil ||
		d.Decode(&log) != nil {
		panic("Miss states for node recovery")
	} else {
		rf.currTerm = currTerm
		rf.votedFor = votedFor
		rf.log = log
		DPrintf("[%v] reboot, Term %v", rf.me, rf.currTerm)
	}
}

persist() needs to be called once one of the three variables listed above gets updated. And readPersist() should be called when Make() gets called.

Optimization

Some tests still get failed sometimes, below are the optimization on top of implementation following figure 2 to omit the failures.

Log disagreement

Sometimes the test would fail because taking too long to reach an agreement. You can follow the instructions in the An aside on optimizations in the post Students' Guide to Raft mentioned in the handout. Add two extra variables in AppendEntriesRPC reply: ConflictTerm and ConflictIndex to speed up decrement in stead of decreasing one term per round.

So at leader side we have:

// Check if the leader has a log at the ConflitTerm
// and get the last entry in the term
lastIdx := rf.searchLastEntryInTerm(reply.ConflictTerm)
if firstIndex != -1 {
    // Assume the follower have the all the logs in that term 
    // and some bad logs in term between ConflictTerm and leader's term  
    // have such a term, try to append from the next term
    rf.nextIndex[i] = firstIndex + 1
} else {
    // That term doesn't exist, so try to check if can agree on the log at ConflictIndex  
    rf.nextIndex[i] = reply.ConflictIndex
}

And at the follower side:

if args.PreLogIndex > lastIdx {
    // The expected PreLogIndex > actual last index on follower
    // Ask for decrement
    reply.IsSuccess = false
    reply.ConflictIndex = len(rf.log)
    reply.ConflictTerm = -1
} else if rf.log[args.PreLogIndex].Term != args.PreLogTerm {
    // Unable to reach agreement at the PreLogIndex. Ask for decrement
    reply.IsSuccess = false
    reply.ConflictTerm = rf.log[args.PreLogIndex].Term
    reply.ConflictIndex = rf.searchFirstEntryInTerm(args.PreLogIndex)
} else {
    // Able to reach agreement at the PreLogIndex
    // ...
}

commitIndex Persist

An other failure I sometimes met in the test TestFigure82C is the apply error:

Test (2C): Figure 8 ...

2021/01/06 23:22:04 apply error: commit index=5 server=4 3879934624216030722 != server=3 6543482455276445645
exit status 1

As I checked the log, I see the situation like what happened in figure 8 in paper. Given 3 nodes, s1, s2 and s3:

  1. s1, s2 and s3 initialized. s1 becomes leader of term 1;

  2. s1 receives 3 new commands{1, 2, 3}, sending to s2 and s3. And then these 3 logs are committed successfully;

  3. s2 disconnects. it begins to increase the term number and request for votes. say it has reached term 3;

  4. s1 crashes and s2 reconnects. Now s2 has a higher term and identical logs. s2 becomes the new leader of term 4;

  5. s2 receives 1 new command{4}. Before it sends to s3, it disconnects;

  6. s1 now reboots but r2 disconnects. s1 may wins the election again as it has the identical logs. Not s1 and s3 comes to term 5.

  7. s1 receive 1 new command{5}. the command successfully got applied to s1 and s3 and got committed;

  8. s1 crashes, s2 reconnects and s3 restarts. s2 may wins the election if it jumps to the term 5 and election timer times out before s1’s timer. Now s2 and s3 comes to term 5.

  9. Since s3 restarts, it’s commitIndex gets reset to 0. So s2 will try to let s1 overwrite command{5} with command{4}. As now s1’s commitIndex is 0. It will commit the log at the same index again.

To solve this problem, when calling persist(), add rf.commitIndex as well. So the node is aware of how many logs have been committed after reboot. And it will reject AppendEntriesRPC request where args.PreLogIndex < rf.commitIndex, thinking it the same as the request with a smaller term number.

Unreliable Network

Besides, I sometimes have the failure where index out of range when trying to send AppendEntriesRPC and use rf.nextIndex[i] to slice the logs.

Test (2C): Figure 8 (unreliable) ...
panic: runtime error: slice bounds out of range [303:115]

goroutine 17137 [running]:
~/6.824/src/raft.(*Raft).broadcastHeartbeat(0xc0001ce9a0, 0x59)
        ~/6.824/src/raft/raft_appendEntry.go:223 +0x617
created by ~/6.824/src/raft.(*Raft).setStateLeader
        ~/6.824/src/raft/raft.go:208 +0x156
exit status 2

And this error would only happen when having an unreliable network. When checking the log, I see it’s because rf.nextIndex[i] get increased multiple times while trying to send AppendEntriesRPC to follower. After review the code, I see:

if reply.IsSuccess {
    rf.nextIndex[i] += len(args.Entries)
}

This is the reason why this error showed up. The replies of multiple heartbeat request may comes together and all of them works. So the nextIndex got updated multiple times.

The easiest fix is using the values in the request directly to set instead of addition.

// In case of unreliable network, running multiple times
rf.nextIndex[i] = args.PrevLogIndex + len(args.Entries) + 1

References

  1. Students' Guide to Raft