当前位置: 代码迷 >> 综合 >> 一步一步完成 MIT-6.824-Lab1 : MapReduce 之二
  详细解决方案

一步一步完成 MIT-6.824-Lab1 : MapReduce 之二

热度:55   发布时间:2023-12-13 23:17:29.0

一步一步完成 MIT-6.824-Lab1 : MapReduce 之二

GitHub代码仓库:Mit-6.824-Lab1-MapReduce

接上文
一步一步完成MIT-6.824-Lab1:MapReduce 之一

步骤三

目标

被分配了 map 任务的 worker, 读取对应的 input 中的内容。通过 Map 函数,完成对输入内容的解析。解析的结果是一系列的 key/value 对。这些 key/value 对被称为中间值,被暂存在内存中。

我们的实现

我们目前未完善的
// master.go
Master 的 Struct 还未完全完善
MyCallHandler() 还未完全完善
完善 worker 与 master 的通信数据格式

接下来我们在 rpc.go 完善 worker 与 master 之间的数据通讯格式。

首先,我们列举出可能的消息类型,比如:请求一个任务,完成了一个任务等等。

// rpc.go// Add your RPC definitions here.
const (MsgForTask = iota        // ask a taskMsgForInterFileLoc       // send intermediate files' location to masterMsgForFinishMap          // finish a map taskMsgForFinishReduce       // finish a reduce task
)

如上,类型的用途见注释。
接下来,定义 RPC 发送消息的 struct 和 接受回复的 struct。 按照我们的上面代码中的消息类型, 我们吧发送消息的 struct 写成两类, 一类单独用来传中间文件的位置给master。

// Add your RPC definitions here.type MyArgs struct {
    MessageType     intMessageCnt      string
}// send intermediate files' filename to master
type MyIntermediateFile struct {
    MessageType     intMessageCnt      stringNReduceType     int
}

MyIntermediateFile 专用于发送中间文件的位置到master。即消息类型为MsgForInterFileLoc的使用 MyIntermediateFile。其他消息类型使用 MyArgs。注意:其中NReduceType字段是值经过我们自定义的分割函数后,得到了分割后的intermediate 文件交由哪类 reduce 任务的编号。关于该partation函数后文介绍。
然后定义 reply 的struct:

type MyReply struct {
    Filename           string         // get a filenameMapNumAllocated    intNReduce            intReduceNumAllocated int TaskType           string         // refer a task type : "map" or "reduce"ReduceFileList     []string       // File list about 
}

所有 RPC 请求的 reply 均使用该类型的 reply。
struct 中包括: 被分配的任务的类型,map 任务的话,FIlename字段装载input文件名, reduce 任务的话, ReduceFileList 字段装载文件名的list。 MapNumAllocated 代表 map 任务被分配的任务的编号。 ReduceNumAllocated 代表 reduce 任务被分配的任务的编号。NReduce 字段代表总 reduce 任务数。

完善 master 结构体和 MyCallHandler

定义了 worker 与 master 之间的消息格式, 接下来,我们需要再完善一下 Master 结构体和 MyCallHandler。
我们注意到,我们的消息中包括中间值的存放位置,这也是 worker 发送给 master 的。所以,我们需要在 master 中对这些位置做记录:

// master.go ---- Master struct
// Master struct
type Master struct {
    // Your definitions here.AllFilesName        map[string]intMapTaskNumCount     intNReduce             int               // n reduce task// InterFIlename       [][]string        // store location of intermediate filesMapFinished         boolReduceTaskStatus    map[int]int      // about reduce tasks' statusReduceFinished      bool              // Finish the reduce taskRWLock              *sync.RWMutex
}

我们在 MakeMaster 中初始化。

// master.go ----- func MakeMaster()// Your code here
m.InterFIlename = make([][]string, m.NReduce)

然后,完善 MyCallHandler()。我们的消息类型还有 MsgForFinishMap; MsgForFinishReduce; MsgForInterFileLoc这些,我们也需要对这些进行处理:

// master.go ----- func MyCallHandler()switch(msgType) {
    
case MsgForTask:select {
    case filename := <- maptasks:// allocate map taskreply.Filename = filenamereply.MapNumAllocated = m.MapTaskNumCountreply.NReduce = m.NReducereply.TaskType = "map"m.RWLock.Lock()m.AllFilesName[filename] = Allocatedm.MapTaskNumCount++m.RWLock.Unlock()go m.timerForWorker("map",filename)return nilcase reduceNum := <- reducetasks:// allocate reduce taskreply.TaskType = "reduce"reply.ReduceFileList = m.InterFIlename[reduceNum]reply.NReduce = m.NReducereply.ReduceNumAllocated = reduceNumm.RWLock.Lock()m.ReduceTaskStatus[reduceNum] = Allocatedm.RWLock.Unlock()go m.timerForWorker("reduce", strconv.Itoa(reduceNum))return nil}
case MsgForFinishMap:// finish a map taskm.RWLock.Lock()defer m.RWLock.Unlock()m.AllFilesName[args.MessageCnt] = Finished   // set status as finish
case MsgForFinishReduce:// finish a reduce task index, _ := strconv.Atoi(args.MessageCnt)m.RWLock.Lock()defer m.RWLock.Unlock()m.ReduceTaskStatus[index] = Finished        // set status as finish
}

如上, master 里判断 args里的消息类型。 如果是 MsgForTask 的话,就向 worker 传一个 task。该 task 也是由 master 生产的。很明显,在map执行完之前,reduce任务是不会执行的。这个从我们之前的代码 generateTask 中可以看出。
如果消息类型是 MsgForFinishMapMsgForFinishReduce 的话,将对应的 task 的状态设置为 Finished。
如果消息类型是 MsgForInterFileLoc 的话, 我们这里另外写一个函数,供 worker 调用,处理该消息类型:

// master.go ----- func MyInnerFileHandler// MyInnerFileHandler : intermediate files' handler
func (m *Master) MyInnerFileHandler(args *MyIntermediateFile, reply *MyReply) error {
    nReduceNUm := args.NReduceType;filename := args.MessageCnt;// store themmm.InterFIlename[nReduceNUm] = append(m.InterFIlename[nReduceNUm], filename)return nil
}

通过读取参数中的 NReduceType 字段,获取该文件应该由哪个编号的 reduce 任务处理,存放在相应的位置。

另外,MyCallHandler 代码中的 go m.timerForWorker()后文再做介绍。

接下来完成 worker 的 map 部分

worker 从 master 那里获取到 map 任务后,即开始自己的 map 任务。
首先,关于 worker 向 master 请求任务的细节:
我们观察 /main 下的 mrworker.go, 按照官网方法运行一个 mrworker.go, 只会产生一个 worker。最开始, Worker 里的代码是这样:

// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {
    // Your worker implementation here.// uncomment to send the Example RPC to the master.//CallExample()// RPC callCallForTask()
}

很明显,这样的worker会在一次call后就退出,这样肯定不对,worker应该一直运行,向master获取任务,然后执行,然后再获取,直到所有任务执行完,或者master断开。Worker的完善我们将在后面的步骤给出
另外,我们需要写好我们自己定义的与 master 交流了的函数,即发送 MsgForTask, MsgForInnerFileLoc 等等。

// CallForTask : my RPC call function
func CallForTask(msgType int,msgCnt string) MyReply {
    args := MyArgs{
    }args.MessageType = msgTypeargs.MessageCnt = msgCntreply := MyReply{
    }// callres := call("Master.MyCallHandler", &args, &reply)if res {
    fmt.Printf("reply.type %v\n",reply.TaskType)} else {
    return MyReply{
    TaskType:""}}return reply
}// SendInterFiles : send intermediate files' location (filenames here) to master
func SendInterFiles(msgType int, msgCnt string, nReduceType int) MyReply {
    args := MyIntermediateFile{
    }args.MessageType = msgTypeargs.MessageCnt = msgCntargs.NReduceType = nReduceTyperepley := MyReply{
    }res := call("Master.MyInnerFileHandler", &args, &repley)if !res {
    fmt.Println("error sending intermediate files' location")}return repley
}

上述两个函数,即代表 worker 向 master 交流的函数。 CallForTask 是 MsgForTask,MsgForFinishMapMsgForFinishReduce使用。 SendInterFiles 是 MsgForInnerFileLoc使用。两个函数中的call函数原有代码已经给出了。 CallForTask 的对于请求任务的消息的reply中会包含任务相关的信息,对于告知master 任务完成的reply消息中不会有内容,而且我们此时不会用到这个reply。 SendInterFiles 的 reply 也是没有内容的,因为我们 master 不需要此处不需要 reply。
接下来就可以进行对任务的处理了。
我们这里先完成对 map 任务的处理:

// worker.go// mapInWorker : workers do the map phase
func mapInWorker(reply *MyReply,mapf func(string, string) []KeyValue) {
    file, err := os.Open(reply.Filename)defer file.Close()if err != nil {
    log.Fatalf("cannot open %v", reply.Filename)}content, err := ioutil.ReadAll(file)if err != nil {
    log.Fatalf("cannot read %v", reply.Filename)}// map function, get intermediate keyvalue pairskva := mapf(reply.Filename, string(content))// partition function. finish the partition taskkvas := Partition(kva, reply.NReduce)// write to temp local filefor i := 0; i<reply.NReduce; i++ {
    filename := WriteToJSONFile(kvas[i], reply.MapNumAllocated, i)_ = SendInterFiles(MsgForInterFileLoc, filename, i)}_ = CallForTask(MsgForFinishMap, reply.Filename)
}

以上则是 map 任务的处理函数。读取文件内容, 然后执行事先制定好了的 plugin 中的 map 函数,生成中间值对。 此时,就需要用到我们的分割函数了,将中间值经过分割函数分割,得到用于不同 reduce 任务的中间值。然后写入到本地磁盘中,并将文件location 发送给 master .
关于分割函数,发送location 这些内容,我们接下来在步骤4讨论。

步骤4

一步一步完成MIT-6.824-Lab1:MapReduce 之三

  相关解决方案