This is the first lab of MIT 6.824, Distributed Systems. It is about the implementation of MapReduce, a framework introduced by Google that can process huge volume of data parallelly by splitting a single task to multiple small ones and assigning them to a cluster of “cheap” machines. And by using this framework, also as mentioned in the paper, this lets programmer make use of distributed system without the need of experience of it.
A very classic use case of this framework is counting words in a large file, which is also what we are to implement.
Concept Behind the Scene
The overview of this framework is illustrated as the Figure 1 in the paper. There will be one Master and many Worker.
Master will assign tasks to Worker to execute and monitor the progress. It receives m input files and will generate r output files.
Worker mainly works on two things:
- During
Mapphase, eachWorkerreads one ofminput files, apply it to user-definedMap()which returns some<key, value>pairs. Then save them into intermediate files. Usually, there will bemMap tasks in total. - During
Reudecephase, eachWorkerreads the<key, value>pairs in the corresponding intermediate files and apply them toReduce()and save the result to output file. There will berReduce tasks in total.
Implementation in the Lab
We are required to implement four major components in the framework: Task, RPC, Workerand Master .
Task
This is a Go struct sent back-and-forth between Master and Worker.
Master -> Worker: start a new task.
Worker -> Master: report the task result.
type MapReduceTask struct {
// 0: Map, 1: Reduce, 2: Exit, 3: Wait
TaskType int
// 0: Unassigned, 1: Assigned, 2: Finished
Status int
// Start Time
TimeStamp time.Time
// Index in the list of tasks
Index int
InputFiles []string
OutputFile []string
}
For Map task, it’s expected to have 1 file name in InputFiles and nReduce of file name in OutputFile.
Similarly, for Reduce task, it’s expected to have nReduce of file name in InputFiles and 1 file name in OutputFile.
RPC
Since Master and Worker are different processes, we are asked to use Remote procedure call to send request and response between Master and Worker.
In this lab we can create two RPC, one for requesting a new task and one for submitting a task:
type RequestTaskArgs struct {}
type RequestTaskReply struct {
NReduce int
Task MapReduceTask
}
type SubmitTaskArgs struct {
Task MapReduceTask
}
// Here Master is always available
type SubmitTaskReply struct {}
Worker
worker is kind of single thread. It keeps requesting new task, processing it, report it and exit when master sends signal to exit.
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
for {
args := RequestTaskArgs{}
reply := RequestTaskReply{}
res := call("Master.RequestTask", &args, &reply)
if !res {
break
}
switch reply.Task.TaskType {
case MAP:
doMap(&reply, mapf)
case REDUCE:
doReduce(&reply, reducef)
case WAIT:
time.Sleep(1 * time.Second)
case EXIT:
os.Exit(0)
}
}
}
Master
The responsibilities for master are:
-
assigning the tasks to different
worker. And if someworkerdoes not report task back after certain time (10 sec here), reassign the task to anotherworker; -
monitoring the progress. If all the map tasks are done,
workershould start to assign reduce tasks. When all the reduce tasks are done, master needs to tell the worker to exit; -
validating the result. Only confirm the
worker’s output files are valid when the task is completed and submitted.
So the Master should be like below
type Master struct {
inputFiles []string
nReduce int
mapTasks []MapReduceTask
reduceTasks []MapReduceTask
// Increase by 1 when one mapTask done. The map Phase is done when mapDone == inputFiles
mapDone int
// Increase by 1 when one reduceTask done. The reduce Phase is done when reduceDone == nReduce
reduceDone int
// Each time allow one work to update
mutex sync.Mutex
}
and when initializing it:
func MakeMaster(files []string, nReduce int) *Master {
m := Master{
inputFiles: files,
nReduce: nReduce,
mapTasks: make([]mapReduceTask, len(files)),
reduceTasks: make([]mapReduceTask, nReduce),
mapDone: 0,
reduceDone: 0,
mutex: sync.Mutex{},
}
// Fill each task in array mapTasks with the input file name and itialize other fileds
// Collect arrray for InputFiles and OutputFile in two tasks array
// ......
return &m
}
Tips
workersometimes fails when processing the task. It might happen to write result to output files halfway. To avoid those garbage output,workershould be designed to write to an temp file and only when the entire task gets submitted, master then marks them are valid output.
// Worker create a new temp output file
tmpFile, err := ioutil.TempFile("./", "mr")
// Master rename them when recv a submit request
for _, file := range reply.Task.OutputFile {
// validName :=
// mapTask: mr-<mapTask_idx>-<reduceTask_idx>
// reduceTask: mr-out-<reduceTask_idx>
err := os.Rename(tempOut, validName)
if err != nil {
panic(err)
}
}
-
During the map phase,
workershould useihash(key) % NReduceas the reduce task number to group the result<key, value>pair into the same array. -
All the field names in the RPC
ArgsandReplyshould begin with capital letter or Go will not send them.