This is the first lab of MIT 6.824, Distributed Systems. It is about the implementation of MapReduce, a framework introduced by Google that can process huge volume of data parallelly by splitting a single task to multiple small ones and assigning them to a cluster of “cheap” machines. And by using this framework, also as mentioned in the paper, this lets programmer make use of distributed system without the need of experience of it.

A very classic use case of this framework is counting words in a large file, which is also what we are to implement.

Concept Behind the Scene

The overview of this framework is illustrated as the Figure 1 in the paper. There will be one Master and many Worker.

Master will assign tasks to Worker to execute and monitor the progress. It receives m input files and will generate r output files.

Worker mainly works on two things:

  1. During Map phase, each Worker reads one of m input files, apply it to user-defined Map() which returns some <key, value> pairs. Then save them into intermediate files. Usually, there will be m Map tasks in total.
  2. During Reudece phase, each Worker reads the <key, value> pairs in the corresponding intermediate files and apply them to Reduce() and save the result to output file. There will be r Reduce tasks in total.

Implementation in the Lab

We are required to implement four major components in the framework: Task, RPC, Workerand Master .

Task

This is a Go struct sent back-and-forth between Master and Worker.

Master -> Worker: start a new task.

Worker -> Master: report the task result.

type MapReduceTask struct {
	// 0: Map, 1: Reduce, 2: Exit, 3: Wait
	TaskType int
	// 0: Unassigned, 1: Assigned, 2: Finished
	Status int
	// Start Time
	TimeStamp time.Time

	// Index in the list of tasks
	Index int

	InputFiles []string
	OutputFile []string
}

For Map task, it’s expected to have 1 file name in InputFiles and nReduce of file name in OutputFile.

Similarly, for Reduce task, it’s expected to have nReduce of file name in InputFiles and 1 file name in OutputFile.

RPC

Since Master and Worker are different processes, we are asked to use Remote procedure call to send request and response between Master and Worker.

In this lab we can create two RPC, one for requesting a new task and one for submitting a task:

type RequestTaskArgs struct {}

type RequestTaskReply struct {
	NReduce int
	Task    MapReduceTask
}

type SubmitTaskArgs struct {
	Task MapReduceTask
}

// Here Master is always available 
type SubmitTaskReply struct {}

Worker

worker is kind of single thread. It keeps requesting new task, processing it, report it and exit when master sends signal to exit.

func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
	for {
		args := RequestTaskArgs{}
		reply := RequestTaskReply{}

		res := call("Master.RequestTask", &args, &reply)
		if !res {
			break
		}

		switch reply.Task.TaskType {
		case MAP:
			doMap(&reply, mapf)
		case REDUCE:
			doReduce(&reply, reducef)
		case WAIT:
			time.Sleep(1 * time.Second)
		case EXIT:
			os.Exit(0)
		}

	}
}

Master

The responsibilities for master are:

  1. assigning the tasks to different worker. And if some worker does not report task back after certain time (10 sec here), reassign the task to another worker;

  2. monitoring the progress. If all the map tasks are done, worker should start to assign reduce tasks. When all the reduce tasks are done, master needs to tell the worker to exit;

  3. validating the result. Only confirm the worker’s output files are valid when the task is completed and submitted.

So the Master should be like below

type Master struct {
	inputFiles  []string
	nReduce     int

	mapTasks    []MapReduceTask
	reduceTasks []MapReduceTask

    // Increase by 1 when one mapTask done. The map Phase is done when mapDone == inputFiles
	mapDone     int
    // Increase by 1 when one reduceTask done. The reduce Phase is done when reduceDone == nReduce
	reduceDone  int

	// Each time allow one work to update
	mutex       sync.Mutex
}

and when initializing it:

func MakeMaster(files []string, nReduce int) *Master {
	m := Master{
		inputFiles:  files,
		nReduce:     nReduce,
		mapTasks:    make([]mapReduceTask, len(files)),
		reduceTasks: make([]mapReduceTask, nReduce),
		mapDone:     0,
		reduceDone:  0,
		mutex:       sync.Mutex{},
	}
	// Fill each task in array mapTasks with the input file name and itialize other fileds
	// Collect arrray for InputFiles and OutputFile in two tasks array
    
    // ......
	return &m
}

Tips

  1. worker sometimes fails when processing the task. It might happen to write result to output files halfway. To avoid those garbage output, worker should be designed to write to an temp file and only when the entire task gets submitted, master then marks them are valid output.
// Worker create a new temp output file
tmpFile, err := ioutil.TempFile("./", "mr")

// Master rename them when recv a submit request
for _, file := range reply.Task.OutputFile {
    // validName :=
    // 		mapTask:    mr-<mapTask_idx>-<reduceTask_idx>
    //		reduceTask: mr-out-<reduceTask_idx>
    err := os.Rename(tempOut, validName)
    if err != nil {
			panic(err)
    }
}
  1. During the map phase, worker should use ihash(key) % NReduce as the reduce task number to group the result <key, value> pair into the same array.

  2. All the field names in the RPC Args and Reply should begin with capital letter or Go will not send them.