当前位置: 代码迷 >> 综合 >> 一步一步完成 MIT-6.824-Lab1 : MapReduce 之一
  详细解决方案

一步一步完成 MIT-6.824-Lab1 : MapReduce 之一

热度:10   发布时间:2023-12-13 23:17:44.0

一步一步完成 MIT-6.824-Lab1 : MapReduce 之一

GitHub代码仓库:Mit-6.824-Lab1-MapReduce

回顾上一篇博文中提到了 MapReduce 论文, 本次的 MIT 的 Lab1-MapReduce, 可以根据论文中提到的完成一个 MapReduce 系统的步骤来完成,此处大致列下步骤,当然,当然,没有列出细节部分。本次实验就根据这个步骤来一步一步的完成:

  1. MapReduce 库将用户成据的输入分割成 M 快,每块通常 16-64 MB。
  2. 存在一个 Master, 和多个 Worker。存在 M 个 map 任务和 N 个 reduce 任务等待分配。 Master 从worker中选择空闲的worker来完成任务。
  3. 被分配了 map 任务的 worker, 读取对应的 input 中的内容。通过 Map 函数,完成对输入内容的解析。解析的结果是一系列的 key/value 对。这些 key/value 对被称为中间值,被暂存在内存中。
  4. 定期的,这些内存中的中间值会经过一个用户自定义的 Pratition 分割函数,分成 N 份,(即 reduce task 的数量)。然后写到本地的磁盘中。这些文件的存放位置需要发送给 Master, 以保证能够被正确找到,进行 reduce 任务。
  5. 当一个 worker 被分配了 reduce 任务后,通过远程程序调用,读取 map worker 存放在其本地的中间文件。当读取了所有的中间值后,reduce worker 对中间值按照键值对的 key 进行排序。 如果中间值太大以至于内存容纳不下,那么,一个可能就需要一个外部的排序。
  6. reduce worker 重复迭代排序后的中间值。reduce worker接下来对中间值经过用户自定义的 Reduce 来处理。得到一个 output 文件。
  7. 当所有的 Map 和 Reduce 完成后,程序正常退出。

接下来,我们按照上述的7个步骤一步一步地完成 MIT-6.824 Lab1:MapReduce。
works on ubuntu

步骤一

目标

MapReduce 库将用户成据的输入分割成 M 快,每块通常 16-64 MB。

我们的实现

通过查看实验文件,我们可以的得知,本次 MapReduce 实验的待处理输入文件是 /main 下的一系列以 pg-*.txt。阅读官网上的实验要求:

The pg-*.txt arguments to mrmaster.go are the input files; each file corresponds to one "split", and is the input to one Map task.

每一个 pg-*.txt 都已经是一个 "split" 后的,传给单个 Map task 的输入。所以说,此时我们的目标已经达成。文件中一共有8个txt待处理,所以本实验,正常情况存在 8 个 Map 任务。

步骤2

目标

存在一个 Master, 和多个 Worker。存在 M 个 map 任务和 N 个 reduce 任务等待分配。 Master 从worker中选择空闲的worker来完成任务。

我们的实现

查看项目文件,在 /main 下存在 mrmaster.go ,该 go 文件即会创建一个 Master。阅读官网说明,运行 Master的方法是:

go run mrmaster.go pg-*.txt

可以看出,mrmaster.go 接受一系列 pg-*.txt 文件作为输入。查看 mrmaster.go 内部:

m := mr.MakeMaster(os.Args[1:], 10)

在查看 package mr 下的 MakeMaster :

// create a Master.
// main/mrmaster.go calls this function.
//
func MakeMaster(files []string, nReduce int) *Master {
    m := Master{
    }// Your code here.m.server()return &m
}
创建 Master

可以看出,我们运行一个 master 的时候,传递的两个参数,第一个:os.Arg[1:]表示split 了的input文件。 第二个 10 代表 nReduce ,也就是值 Reduce 任务的数量。所以,我们此时创建了一个 master,并要求 reduce 的任务数量为 10。
接下来,我们完成 Masrer 需要的数据。

//在 master.go 文件中,完善现在这个阶段 Master struct// Master struct
type Master struct {
    // Your definitions here.AllFilesName        map[string]int   // splited filesMapTaskNumCount     int              // curr map task numberNReduce             int              // n reduce taskMapFinished         boolReduceTaskStatus    map[int]int      // about reduce tasks' statusReduceFinished      boolRWLock              *sync.RWMutex             
}

以上是现在这个阶段 Master 需要的一些数据。 用一个 map[string]int 类型的数据来存放 input 文件。其中,map的 key 就代表 inuput 文件名,int类型的 value 代表目前这个 input 文件的状态。同样,我们使用一个 map[int]int 的数据来存放 reduce 任务。 int 类型的 key 代表 reduce 任务的编号, int 类型的 value 代表目前这个编号的 reduce 任务的状态。 另外用 int 类型的数据存放 reduce 任务数 和当前 Map 任务编号。用 bool 类型的数据存放是否完成整个 Map 和 Reduce。
目前这个Master的结构还不完善,我们在后续的步骤中继续补充。

接下来,关于Map 和 Reduce 任务的状态, 我们用 int 类型来表示
// master.go// tasks' status
// UnAllocated ----> UnAllocated to any worker
// Allocated ----> be allocated to a worker
// Finished ----> worker finish the map task
const (UnAllocated = iotaAllocatedFinished
)

接下来,在函数 MakeMaster 中完成初始化:

// master.go -----func MakeMaster()// your code here
m.AllFilesName = make(map[string]int)
m.MapTaskNumCount = 0
m.NReduce = nReduce
m.MapFinished = false
m.ReduceFinished = false
m.ReduceTaskStatus = make(map[int]int)
m.RWLock = new(sync.RWMutex)
for _,v := range files {
    m.AllFilesName[v] = UnAllocated
}for i := 0; i<nReduce; i++ {
    m.ReduceTaskStatus[i] = UnAllocated
}

到此,我们完善了一个基本的 Master 的结构。

master 生成 Map 任务

接下来, 我们让 master 来成生 map 与 reduce 任务。
为了提高任务生产 worker 领取任务的效率,我们此时使用两个 channel 适用与 map与reduce 任务。并让一个函数与主进程并行执行,该进程用来生成任务。

// master.go// global
var maptasks chan string          // chan for map task
var reducetasks chan int          // chan for reduce task// generateTask : create tasks
func (m *Master) generateTask() {
    for k,v := range m.AllFilesName {
    if v == UnAllocated {
    maptasks <- k          // add task to channel}}ok := falsefor !ok {
    ok = checkAllMapTask(m)    // check if all map tasks have finished}m.MapFinished = truefor k,v := range m.ReduceTaskStatus {
    if v == UnAllocated {
    reducetasks <- k}}ok = falsefor !ok {
    ok = checkAllReduceTask(m)}m.ReduceFinished = true
}// checkAllMapTask : check if all map tasks are finished
func checkAllMapTask(m *Master) bool {
    m.RWLock.RLock()defer m.RWLock.RUnlock()for _,v := range m.AllFilesName {
    if v != Finished {
    return false}}return true
}func checkAllReduceTask(m *Master) bool {
    m.RWLock.RLock()defer m.RWLock.RUnlock()for _, v := range m.ReduceTaskStatus {
    if v != Finished {
    return false}}return true
}

以上则是 master 用来生成 map 和 reduce 任务的相关代码。将生成的任务,写入到相应的channel中去。worker 在向 master 请求任务的时候, master 从channel中获取到任务,发送给 worker , worker执行。代码中还会一直监视任务的状态,来判断是否任务都完成了。

当然,我们还要初始化这两个 channel, 以及注册 RPC 服务, 并行运行 generateTask():

// master.go --- func server()// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
    // init channels maptasks = make(chan string, 5)reducetasks = make(chan int, 5)rpc.Register(m)rpc.HandleHTTP()// parallel run generateTask()go m.generateTask()//l, e := net.Listen("tcp", ":1234")sockname := masterSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {
    log.Fatal("listen error:", e)}go http.Serve(l, nil)
}

到现在,我们已经成功将 master 完成了 RPC 的注册,并完成了任务生成进程的运行。

接下来,初步完善任务分配部分。

完成本步骤最后的要求,对 worker 寻求任务的要求作出响应。我们通过创建一个RPC handler 来实现。 worker call这个handler, 获取任务。

// master.go// MyCallHandler func
// Your code here -- RPC handlers for the worker to call.
func (m *Master) MyCallHandler(args *MyArgs, reply *MyReply) error {
    msgType := args.MessageType          // worker 发送的消息的类型switch(msgType) {
    case MsgForTask:select {
    case filename := <- maptasks:// allocate map taskreply.Filename = filenamereply.MapNumAllocated = m.MapTaskNumCountreply.NReduce = m.NReducereply.TaskType = "map"m.RWLock.Lock()m.AllFilesName[filename] = Allocatedm.MapTaskNumCount++m.RWLock.Unlock()go m.timerForWorker("map",filename)  // 定时函数,后面步骤解释return nilcase reduceNum := <- reducetasks:// allocate reduce taskreply.TaskType = "reduce"reply.ReduceFileList = m.InterFIlename[reduceNum]reply.NReduce = m.NReducereply.ReduceNumAllocated = reduceNumm.RWLock.Lock()m.ReduceTaskStatus[reduceNum] = Allocatedm.RWLock.Unlock()go m.timerForWorker("reduce", strconv.Itoa(reduceNum))return nil}return nil
}

本函数目前也不完善,后续步骤中一步一步完善函数内容。本 handler 通过处理我们自己定义的 worker 与 master 之间的消息类型, 作出不同的处理。 此处, master 检查 worker 发送的消息是否是 MsgForTask(这是一个int类型数据,后文会提到),然后向 worker 发送任务。

接下来是步骤三

一步一步完成MIT-6.824-Lab1:MapReduce 之二

  相关解决方案