发布时间:2023-12-13 23:17:29.0

一步一步完成MIT-6.824-Lab1:MapReduce 之一



被分配了 map 任务的 worker, 读取对应的 input 中的内容。通过 Map 函数,完成对输入内容的解析。解析的结果是一系列的 key/value 对。这些 key/value 对被称为中间值,被暂存在内存中。


// master.go
Master 的 Struct 还未完全完善
MyCallHandler() 还未完全完善
完善 worker 与 master 的通信数据格式

接下来我们在 rpc.go 完善 worker 与 master 之间的数据通讯格式。


// rpc.go// Add your RPC definitions here.
const (MsgForTask = iota        // ask a taskMsgForInterFileLoc       // send intermediate files' location to masterMsgForFinishMap          // finish a map taskMsgForFinishReduce       // finish a reduce task

接下来,定义 RPC 发送消息的 struct 和 接受回复的 struct。 按照我们的上面代码中的消息类型, 我们吧发送消息的 struct 写成两类, 一类单独用来传中间文件的位置给master。

// Add your RPC definitions here.type MyArgs struct {
    MessageType     intMessageCnt      string
}// send intermediate files' filename to master
type MyIntermediateFile struct {
    MessageType     intMessageCnt      stringNReduceType     int

MyIntermediateFile 专用于发送中间文件的位置到master。即消息类型为MsgForInterFileLoc的使用 MyIntermediateFile。其他消息类型使用 MyArgs。注意:其中NReduceType字段是值经过我们自定义的分割函数后,得到了分割后的intermediate 文件交由哪类 reduce 任务的编号。关于该partation函数后文介绍。
然后定义 reply 的struct:

type MyReply struct {
    Filename           string         // get a filenameMapNumAllocated    intNReduce            intReduceNumAllocated int TaskType           string         // refer a task type : "map" or "reduce"ReduceFileList     []string       // File list about 

所有 RPC 请求的 reply 均使用该类型的 reply。
struct 中包括: 被分配的任务的类型,map 任务的话,FIlename字段装载input文件名, reduce 任务的话, ReduceFileList 字段装载文件名的list。 MapNumAllocated 代表 map 任务被分配的任务的编号。 ReduceNumAllocated 代表 reduce 任务被分配的任务的编号。NReduce 字段代表总 reduce 任务数。

完善 master 结构体和 MyCallHandler

定义了 worker 与 master 之间的消息格式, 接下来,我们需要再完善一下 Master 结构体和 MyCallHandler。
我们注意到,我们的消息中包括中间值的存放位置,这也是 worker 发送给 master 的。所以,我们需要在 master 中对这些位置做记录:

// master.go ---- Master struct
// Master struct
type Master struct {
    // Your definitions here.AllFilesName        map[string]intMapTaskNumCount     intNReduce             int               // n reduce task// InterFIlename       [][]string        // store location of intermediate filesMapFinished         boolReduceTaskStatus    map[int]int      // about reduce tasks' statusReduceFinished      bool              // Finish the reduce taskRWLock              *sync.RWMutex

我们在 MakeMaster 中初始化。

// master.go ----- func MakeMaster()// Your code here
m.InterFIlename = make([][]string, m.NReduce)

然后,完善 MyCallHandler()。我们的消息类型还有 MsgForFinishMap; MsgForFinishReduce; MsgForInterFileLoc这些,我们也需要对这些进行处理:

// master.go ----- func MyCallHandler()switch(msgType) {
case MsgForTask:select {
    case filename := <- maptasks:// allocate map taskreply.Filename = filenamereply.MapNumAllocated = m.MapTaskNumCountreply.NReduce = m.NReducereply.TaskType = "map"m.RWLock.Lock()m.AllFilesName[filename] = Allocatedm.MapTaskNumCount++m.RWLock.Unlock()go m.timerForWorker("map",filename)return nilcase reduceNum := <- reducetasks:// allocate reduce taskreply.TaskType = "reduce"reply.ReduceFileList = m.InterFIlename[reduceNum]reply.NReduce = m.NReducereply.ReduceNumAllocated = reduceNumm.RWLock.Lock()m.ReduceTaskStatus[reduceNum] = Allocatedm.RWLock.Unlock()go m.timerForWorker("reduce", strconv.Itoa(reduceNum))return nil}
case MsgForFinishMap:// finish a map taskm.RWLock.Lock()defer m.RWLock.Unlock()m.AllFilesName[args.MessageCnt] = Finished   // set status as finish
case MsgForFinishReduce:// finish a reduce task index, _ := strconv.Atoi(args.MessageCnt)m.RWLock.Lock()defer m.RWLock.Unlock()m.ReduceTaskStatus[index] = Finished        // set status as finish

如上, master 里判断 args里的消息类型。 如果是 MsgForTask 的话,就向 worker 传一个 task。该 task 也是由 master 生产的。很明显,在map执行完之前,reduce任务是不会执行的。这个从我们之前的代码 generateTask 中可以看出。
如果消息类型是 MsgForFinishMapMsgForFinishReduce 的话,将对应的 task 的状态设置为 Finished。
如果消息类型是 MsgForInterFileLoc 的话, 我们这里另外写一个函数,供 worker 调用,处理该消息类型:

// master.go ----- func MyInnerFileHandler// MyInnerFileHandler : intermediate files' handler
func (m *Master) MyInnerFileHandler(args *MyIntermediateFile, reply *MyReply) error {
    nReduceNUm := args.NReduceType;filename := args.MessageCnt;// store themmm.InterFIlename[nReduceNUm] = append(m.InterFIlename[nReduceNUm], filename)return nil

通过读取参数中的 NReduceType 字段,获取该文件应该由哪个编号的 reduce 任务处理,存放在相应的位置。

另外,MyCallHandler 代码中的 go m.timerForWorker()后文再做介绍。

接下来完成 worker 的 map 部分

worker 从 master 那里获取到 map 任务后,即开始自己的 map 任务。
首先,关于 worker 向 master 请求任务的细节:
我们观察 /main 下的 mrworker.go, 按照官网方法运行一个 mrworker.go, 只会产生一个 worker。最开始, Worker 里的代码是这样:

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {
    // Your worker implementation here.// uncomment to send the Example RPC to the master.//CallExample()// RPC callCallForTask()

另外,我们需要写好我们自己定义的与 master 交流了的函数,即发送 MsgForTask, MsgForInnerFileLoc 等等。

// CallForTask : my RPC call function
func CallForTask(msgType int,msgCnt string) MyReply {
    args := MyArgs{
    }args.MessageType = msgTypeargs.MessageCnt = msgCntreply := MyReply{
    }// callres := call("Master.MyCallHandler", &args, &reply)if res {
    fmt.Printf("reply.type %v\n",reply.TaskType)} else {
    return MyReply{
    TaskType:""}}return reply
}// SendInterFiles : send intermediate files' location (filenames here) to master
func SendInterFiles(msgType int, msgCnt string, nReduceType int) MyReply {
    args := MyIntermediateFile{
    }args.MessageType = msgTypeargs.MessageCnt = msgCntargs.NReduceType = nReduceTyperepley := MyReply{
    }res := call("Master.MyInnerFileHandler", &args, &repley)if !res {
    fmt.Println("error sending intermediate files' location")}return repley

上述两个函数,即代表 worker 向 master 交流的函数。 CallForTask 是 MsgForTask,MsgForFinishMapMsgForFinishReduce使用。 SendInterFiles 是 MsgForInnerFileLoc使用。两个函数中的call函数原有代码已经给出了。 CallForTask 的对于请求任务的消息的reply中会包含任务相关的信息,对于告知master 任务完成的reply消息中不会有内容,而且我们此时不会用到这个reply。 SendInterFiles 的 reply 也是没有内容的,因为我们 master 不需要此处不需要 reply。
我们这里先完成对 map 任务的处理:

// worker.go// mapInWorker : workers do the map phase
func mapInWorker(reply *MyReply,mapf func(string, string) []KeyValue) {
    file, err := os.Open(reply.Filename)defer file.Close()if err != nil {
    log.Fatalf("cannot open %v", reply.Filename)}content, err := ioutil.ReadAll(file)if err != nil {
    log.Fatalf("cannot read %v", reply.Filename)}// map function, get intermediate keyvalue pairskva := mapf(reply.Filename, string(content))// partition function. finish the partition taskkvas := Partition(kva, reply.NReduce)// write to temp local filefor i := 0; i<reply.NReduce; i++ {
    filename := WriteToJSONFile(kvas[i], reply.MapNumAllocated, i)_ = SendInterFiles(MsgForInterFileLoc, filename, i)}_ = CallForTask(MsgForFinishMap, reply.Filename)

以上则是 map 任务的处理函数。读取文件内容, 然后执行事先制定好了的 plugin 中的 map 函数,生成中间值对。 此时,就需要用到我们的分割函数了,将中间值经过分割函数分割,得到用于不同 reduce 任务的中间值。然后写入到本地磁盘中,并将文件location 发送给 master .
关于分割函数,发送location 这些内容,我们接下来在步骤4讨论。


