This is the first lab of MIT 6.824, Distributed Systems. It is about the implementation of MapReduce, a framework introduced by Google that can process huge volume of data parallelly by splitting a single task to multiple small ones and assigning them to a cluster of “cheap” machines. And by using this framework, also as mentioned in the paper, this lets programmer make use of distributed system without the need of experience of it.
A very classic use case of this framework is counting words in a large file, which is also what we are to implement.
Concept Behind the Scene
The overview of this framework is illustrated as the Figure 1
in the paper. There will be one Master
and many Worker
.
Master
will assign tasks to Worker
to execute and monitor
the progress. It receives m
input files and will generate r
output files.
Worker
mainly works on two things:
- During
Map
phase, eachWorker
reads one ofm
input files, apply it to user-definedMap()
which returns some<key, value>
pairs. Then save them into intermediate files. Usually, there will bem
Map tasks in total. - During
Reudece
phase, eachWorker
reads the<key, value>
pairs in the corresponding intermediate files and apply them toReduce()
and save the result to output file. There will ber
Reduce tasks in total.
Implementation in the Lab
We are required to implement four major components in the framework: Task
, RPC
, Worker
and Master
.
Task
This is a Go struct sent back-and-forth between Master
and Worker
.
Master
-> Worker
: start a new task.
Worker
-> Master
: report the task result.
type MapReduceTask struct {
// 0: Map, 1: Reduce, 2: Exit, 3: Wait
TaskType int
// 0: Unassigned, 1: Assigned, 2: Finished
Status int
// Start Time
TimeStamp time.Time
// Index in the list of tasks
Index int
InputFiles []string
OutputFile []string
}
For Map
task, it’s expected to have 1
file name in InputFiles
and nReduce
of file name in OutputFile
.
Similarly, for Reduce
task, it’s expected to have nReduce
of file name in InputFiles
and 1
file name in OutputFile
.
RPC
Since Master
and Worker
are different processes, we are asked to use Remote procedure call to send request and response between Master
and Worker
.
In this lab we can create two RPC, one for requesting a new task and one for submitting a task:
type RequestTaskArgs struct {}
type RequestTaskReply struct {
NReduce int
Task MapReduceTask
}
type SubmitTaskArgs struct {
Task MapReduceTask
}
// Here Master is always available
type SubmitTaskReply struct {}
Worker
worker
is kind of single thread. It keeps requesting new task, processing it, report it and exit when master
sends signal to exit.
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
for {
args := RequestTaskArgs{}
reply := RequestTaskReply{}
res := call("Master.RequestTask", &args, &reply)
if !res {
break
}
switch reply.Task.TaskType {
case MAP:
doMap(&reply, mapf)
case REDUCE:
doReduce(&reply, reducef)
case WAIT:
time.Sleep(1 * time.Second)
case EXIT:
os.Exit(0)
}
}
}
Master
The responsibilities for master
are:
-
assigning the tasks to different
worker
. And if someworker
does not report task back after certain time (10 sec here), reassign the task to anotherworker
; -
monitoring the progress. If all the map tasks are done,
worker
should start to assign reduce tasks. When all the reduce tasks are done, master needs to tell the worker to exit; -
validating the result. Only confirm the
worker
’s output files are valid when the task is completed and submitted.
So the Master
should be like below
type Master struct {
inputFiles []string
nReduce int
mapTasks []MapReduceTask
reduceTasks []MapReduceTask
// Increase by 1 when one mapTask done. The map Phase is done when mapDone == inputFiles
mapDone int
// Increase by 1 when one reduceTask done. The reduce Phase is done when reduceDone == nReduce
reduceDone int
// Each time allow one work to update
mutex sync.Mutex
}
and when initializing it:
func MakeMaster(files []string, nReduce int) *Master {
m := Master{
inputFiles: files,
nReduce: nReduce,
mapTasks: make([]mapReduceTask, len(files)),
reduceTasks: make([]mapReduceTask, nReduce),
mapDone: 0,
reduceDone: 0,
mutex: sync.Mutex{},
}
// Fill each task in array mapTasks with the input file name and itialize other fileds
// Collect arrray for InputFiles and OutputFile in two tasks array
// ......
return &m
}
Tips
worker
sometimes fails when processing the task. It might happen to write result to output files halfway. To avoid those garbage output,worker
should be designed to write to an temp file and only when the entire task gets submitted, master then marks them are valid output.
// Worker create a new temp output file
tmpFile, err := ioutil.TempFile("./", "mr")
// Master rename them when recv a submit request
for _, file := range reply.Task.OutputFile {
// validName :=
// mapTask: mr-<mapTask_idx>-<reduceTask_idx>
// reduceTask: mr-out-<reduceTask_idx>
err := os.Rename(tempOut, validName)
if err != nil {
panic(err)
}
}
-
During the map phase,
worker
should useihash(key) % NReduce
as the reduce task number to group the result<key, value>
pair into the same array. -
All the field names in the RPC
Args
andReply
should begin with capital letter or Go will not send them.