This post is about part A of Lab3 of MIT 6.824, Distributed Systems. For previous two parts, please refer to the posts about the Lab 1 and 2 under the tag Distributed System. In this post, we are to build a fault tolerant key-value services on top of Raft.

Overview

The functions we need to achieve are in fact very simple. It will need three functions:

  1. Put(key, value): Create a new key-value pair. Replace with the vlaue if the keys exists in the map;
  2. Append(key, value): If the key exists in the map, append value to the end of old value. If not, do as what Put() does;
  3. Get(key): Return the value if key exists in the map. Otherwise, return an empty string.

If it’s a singe server app, it won’t take much time to implement. However, for fault tolerance, we will have to maintain multiple kv servers so that the service is available. Thus the real problem now is how to coordinate and synchronize the data between the servers through Raft.

kvraft

As diagram illustrated above, we will have a server cluster, where raft service can see each other to do the election and log replications but the kv server can only communicate with its only Raft service in the cluster. Clients will randomly send requests to any kv servers. However, only the kv server whose raft is the leader will proceed the request and send back the data. All other kv servers will reject the request and ask the client to try other kv server.

Client Side

Let us begin with something easy first. The job of client is very simple, sending requests to one kv server, waiting for response. If timeout or receive any error, retry with other kv server.

As we will have multiple clients running parallelly, to make sure it’s able to know which client sent the request. We will assign each client an ID generated by nrand(). And there is also a chance that the same request might be sent multiple times. To identify the duplicate requests, each request should have an ID generated by nrand(), so that kv server is able to recognize the duplicate requests.

Besides, to speed up the later requests, the client can cache the last seen leader ID (which is basically the index of server in ck.servers), so it doesn’t have to try from the first server.

Thus we can define the client as below:

type Clerk struct {
	servers   []*labrpc.ClientEnd
	serverNum int
    
	clientId int64 // client's own uid
    
	lastLeaderId int // The leader id last heard
}

When clients need to send the Get or PutAppend request, we can have Get defined like below as an example:

// common.go
type GetArgs struct {
	MsgId    int64
	ClientId int64
	Key      string
}

type GetReply struct {
	Key   string
	Value string
    Err   string
}

// client.go
func (ck *Clerk) Get(key string) string {
    var reply GetReply
	args := GetArgs{
		MsgId:    nrand(),
		ClientId: ck.clientId,
		Key:      key,
	}
    
    // Send Get Rpc and wait for the response
    // If timeout or error wrong leader, try anoterh server
}

Server Side

In the entire server cluster, only the server with the leader of Raft services can procced the requests from clients and send back the actual data to client. All other servers acts as back-up, only proceeding the request sent from the channel kv.applyCh and rejecting immediately any requests if finding itself not a leader in Raft services after calling rf.Start()

Those we can illustrate the logic of the cluster as below:

graph TB
	client
	subgraph cluster
		kv1[KV Leader]
        kv2[KV Follower 1]
        kv3[KV Follower 2]
        rf_l(["Raft Leader"])
        rf_1(["Raft Follower 1"])
        rf_2(["Raft Follower 2"])
	end
    client --> | a1. New request| kv1
    kv1  --> |a2. Ask to log the requst| rf_l
    rf_l --> |a3. Append the new log| rf_1 & rf_2
	rf_l --> |a4. complete the request from apply | kv1
	rf_1 --> |a5. Send backed up load to its kv server | kv2
	rf_2 --> |"as (a5)"| kv3
	kv1  --> |"a6. Success (with data return)"| client
	client -. b1. New request .-> kv2
	kv2 -. b2. Falied due to not leader .-> client

In section a, the request is sent to leader and got proceeded in the all the nodes in the cluster but in section b, the request is rejected as KV Follower 1 is not the leader in the cluster.

Besides, another requirement is that duplicate requests must be omitted. Thus for each client, we need to cache the last request id it made.

Thus we can have the struct KVServer defined as:

type KVServer struct {
	mu   sync.Mutex
	me   int
	dead int32 // set by Kill()

	// Raft Server
	rf           *raft.Raft
	applyCh      chan raft.ApplyMsg
	persister    *raft.Persister
    // snapshot if log grows this big
	maxraftstate int

	// sotre the key/value pairs in a map
	data       map[string]string

	// keep a chan for each request submitted to raft. Delete if timeout or it's done
	waitChs map[int64]chan ApplyReply

	// cache the last request invoked by the client
	lastApplies map[int64]int64
}

Leader Proceeding the Requests

All the operation won’t be applied to kv.data directly after receiving the request. Leader KVServer will first submit the request to Raft by rf.Start() and make a wait channel in kv.waitChs for this new request, for channel we need a timer to ensure that the goroutine Get() and PutAppend() will stop if taking too long to execute the request and can know the result of the execution without requiring additional lock for reading/updating the data. All the KVServer will execute the operation until they got the committed log sent back by Raft through kv.applyCh and send the execution result through kv.waitChs the goroutine Get() and PutAppend().

You also need to define a new struct or add some new fields to ApplyMsg , so that KVServer can know request client id, request message id and other data help you identify duplications.

Then we can have:

func (kv *KVServer) start(args applyArg) (rep ApplyReply) {
	index, term, isLeader := kv.rf.Start(args)
	
    // Reject if not leader
	if !isLeader {
		return ApplyReply{Err: ErrWrongLeader}
	}

	kv.lock()
    
	// Create a channel to watch if raft has reached 
    // agreement for this request
	ch := make(chan ApplyReply)
	kv.waitChs[args.MsgId] = ch
	kv.unlock()

	// Wait until either timeout or getting response
    // from WaitCh
}


// goroutine to monitor the kv.applyCh	
func (kv *KVServer) waitApplyChan() {
	for !kv.killed() {
        // Wait the new logged request from raft
		msg := <-kv.applyCh	
		submittedReq := msg.Command
        
        // acquire the log as we need to update and read kv.data
		kv.lock()
        
        // Check if it's a duplicate msg have seen before
        // You have to check it here since, the followers
        // can only see new req through kv.applyCh
		
        // With the op value and key(/value) in the req, update your data
        
		// use the waitCh to tell KVServer to send back the response 
        // status cide and retrived data to client
		if ch, ok := kv.waitChs[submittedReq.MsgId]; ok {
            ch <- ApplyReply{Err: err, Value: value}
		}

		kv.unlock()
	}
}

Changes in Raft

A very important thing to remember in this Lab3A is that KVServer will lose all the data once it dies. The way to recover to the previous state are two:

  • Raft replays all the committed logs it has from 0 to rf.commitIndex;
  • Use RPC InstallSnapshot, which we are to implement in the next lab.

Thus the easiest solution for recovering the data of kv is that rf.lastApplied will be reset to 0 once rebooting, which indicates that kv has no data at all. So once Raft needs to send new ApplyMsg to kv, we should send rf.log[rf.lastApplied+1:newCommitIndex+1] and mark all the logs whose index \(\leq\) oldCommitIndex invalid data as tester has seen these logs already.