当前位置: 代码迷 >> 综合 >> shuffle(partitioner+combiner+sort)
  详细解决方案

shuffle(partitioner+combiner+sort)

热度:30   发布时间:2023-12-25 20:42:14.0

shuffle(partitioner+combiner+sort)

  1. 每一个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件
  2. 写磁盘前,要partition,sort。如果有combiner,combiner排序后数据。
  3. 等最后距离写完,合并全部溢出写文件为一个分区且排序的文件。
  4. reducer通过http方式得到输出文件的分区。
  5. TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完毕,Reduce就开始复制输出。
  6. 排序阶段合并map输出。然后走Reduce阶段。

MR过程各个角色的作用

  1. jobClient:提交作业
  2. JobTracker:初始化作业,分配作业,TaskTracker与其进行通信,协调监控整个作业
  3. TaskTracker:定期与JobTracker通信,执行Map和Reduce任务
  4. HDFS:保留作业的数据、配置、jar包、结果

作业提交

  1. 提交作业之前,需要对作业进行配置   编写自己的MR程序   配置作业,包括输入输出路径等。。。
  2. 提交作业   配置完毕后,通过JobClient提交
  3. 具体功能:
  4. 与JobTracker通信得到一个jar的存储路径和Jobld
  5. 输入输出路径检查
  6. 将jobjar拷贝到HDFS
  7. 计算输入分片,将分片信息写到job.split中
  8. 写job.xml
  9. 真正提交作业

作业初始化

  1. 客户端提交作业后,JobTracker会将作业加入到队列,然后进行调度,默认是FIFO方式
  2. 具体功能:
  3. 作业初始化只要是值JobInProgress中完成的
  4. 读取分片信息
  5. 任务创建task包扣Map和reduce任务
  6. 创建TaskInProgress执行task

任务分配

  1. tasktracker与jobtracker之间的通信和任务分配是通过心跳机制实现的
  2. tasktracker会主动定期向jobtracker发送心跳信息,询问是否有任务要做,如果有,就会申请到任务
  3. 心跳  定期   任务完成--领

任务执行

  1. 如果tasktracker拿到任务,会将所有的信息拷贝到本地,包扣代码、配置、分片信息等
  2. tasktracker中的localizeJob()方法会被条用进行本地化,拷贝job .jar,jobconf,job.xml到本地
  3. tasktracker调用launchTaskForJob()方法加载启动任务
  4. MapTaskRunner和ReduceTaskRunner分别启动javachild进程来执行相应的任务

状态更新

  1. Task会定期向TaskTraker回报执行情况
  2. TaskTracker会定期收集所在集群上的所有Task的信息,并向JobTracker回报
  3. JobTracker会根据所有TaskTracker回报上来的信息进行汇总

作业完成

  1. JobTracker是在接收到最后一个任务完成后,才将任务标记为成功
  2. 将纾解写入到HDFS中

错误处理

  1. JobTracker失败  存在单点故障,hadoop2.0解决了这个问题
  2. TaskTracker失败  tasktracker崩溃了会停止向Jobtracker发送心跳信息,jobtracker会将tasktracker从等待的任务池中移除,并将该任务转移到其他的地方执行,jobtracker将tasktracker加入到黑名单中
  3. Task失败  任务失败,会向TaskTracker抛出异常   任务挂起

JobTracker

  1. 负责接收用户提交的作业,负责启动、跟踪任务执行
  2. JobSubmissionProtocol是JobClient与JobTracker通信的接口
  3. InterTrackerProtocol是TaskTracker与JobTracker通信的接口

TaskTracker

  1. 负责执行任务

JobClient

  1. 是用户作业与JobTracker交互的主要接口
  2. 负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。

Partitioner编程

  1. partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类
  2. HashPartitioner是mapreduce的默认partitioner。计算方法是which reduce=(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks,得到当前的目的reducer
  3. mapper ----获取数据
  4. partitioner  ----分区  属于shuffle
  5. reduce   ---计算
public class TCPartitioner extends Partitioner<Text, TelBean>{@Overridepublic int getPartition(Text key, TelBean bean, int arg2) {// TODO Auto-generated method stub}
}

把partitioner加入到job里面

  1. 把partitioner添加到job里面
  2. job.setPartitionerClass(TCPartitioner.class);
  3. 设置reduceTasks的数量  有几个分区设置几个任务
  4. job.setNumReduceTasks(2);

sort编程

继承WritableComparable<> -序列化并且排序

public class Bean implements WritableComparable<Bean>{//反序列化@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stub}//序列化@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stub}@Overridepublic int compareTo(Bean bean) {// TODO Auto-generated method stub}
}

combiner编程

  1. 每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量
  2. combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能
  3. 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度
  4. 注意:combiner的输出是reduce的输入,如果combiner是可插拔的,添加combiner绝对不能改变最终的计算结果。所以combiner只应该用于那种reduce的输入kry/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。。。
  5. combiner就是map端的educer
  6. job.setCombinerClass();

  相关解决方案