shuffle(partitioner+combiner+sort)
- 每一个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件
- 写磁盘前,要partition,sort。如果有combiner,combiner排序后数据。
- 等最后距离写完,合并全部溢出写文件为一个分区且排序的文件。
- reducer通过http方式得到输出文件的分区。
- TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完毕,Reduce就开始复制输出。
- 排序阶段合并map输出。然后走Reduce阶段。
MR过程各个角色的作用
- jobClient:提交作业
- JobTracker:初始化作业,分配作业,TaskTracker与其进行通信,协调监控整个作业
- TaskTracker:定期与JobTracker通信,执行Map和Reduce任务
- HDFS:保留作业的数据、配置、jar包、结果
作业提交
- 提交作业之前,需要对作业进行配置 编写自己的MR程序 配置作业,包括输入输出路径等。。。
- 提交作业 配置完毕后,通过JobClient提交
- 具体功能:
- 与JobTracker通信得到一个jar的存储路径和Jobld
- 输入输出路径检查
- 将jobjar拷贝到HDFS
- 计算输入分片,将分片信息写到job.split中
- 写job.xml
- 真正提交作业
作业初始化
- 客户端提交作业后,JobTracker会将作业加入到队列,然后进行调度,默认是FIFO方式
- 具体功能:
- 作业初始化只要是值JobInProgress中完成的
- 读取分片信息
- 任务创建task包扣Map和reduce任务
- 创建TaskInProgress执行task
任务分配
- tasktracker与jobtracker之间的通信和任务分配是通过心跳机制实现的
- tasktracker会主动定期向jobtracker发送心跳信息,询问是否有任务要做,如果有,就会申请到任务
- 心跳 定期 任务完成--领
任务执行
- 如果tasktracker拿到任务,会将所有的信息拷贝到本地,包扣代码、配置、分片信息等
- tasktracker中的localizeJob()方法会被条用进行本地化,拷贝job .jar,jobconf,job.xml到本地
- tasktracker调用launchTaskForJob()方法加载启动任务
- MapTaskRunner和ReduceTaskRunner分别启动javachild进程来执行相应的任务
状态更新
- Task会定期向TaskTraker回报执行情况
- TaskTracker会定期收集所在集群上的所有Task的信息,并向JobTracker回报
- JobTracker会根据所有TaskTracker回报上来的信息进行汇总
作业完成
- JobTracker是在接收到最后一个任务完成后,才将任务标记为成功
- 将纾解写入到HDFS中
错误处理
- JobTracker失败 存在单点故障,hadoop2.0解决了这个问题
- TaskTracker失败 tasktracker崩溃了会停止向Jobtracker发送心跳信息,jobtracker会将tasktracker从等待的任务池中移除,并将该任务转移到其他的地方执行,jobtracker将tasktracker加入到黑名单中
- Task失败 任务失败,会向TaskTracker抛出异常 任务挂起
JobTracker
- 负责接收用户提交的作业,负责启动、跟踪任务执行
- JobSubmissionProtocol是JobClient与JobTracker通信的接口
- InterTrackerProtocol是TaskTracker与JobTracker通信的接口
TaskTracker
- 负责执行任务
JobClient
- 是用户作业与JobTracker交互的主要接口
- 负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。
Partitioner编程
- partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类
- HashPartitioner是mapreduce的默认partitioner。计算方法是which reduce=(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks,得到当前的目的reducer
- mapper ----获取数据
- partitioner ----分区 属于shuffle
- reduce ---计算
public class TCPartitioner extends Partitioner<Text, TelBean>{@Overridepublic int getPartition(Text key, TelBean bean, int arg2) {// TODO Auto-generated method stub}
}
把partitioner加入到job里面
- 把partitioner添加到job里面
- job.setPartitionerClass(TCPartitioner.class);
- 设置reduceTasks的数量 有几个分区设置几个任务
- job.setNumReduceTasks(2);
sort编程
继承WritableComparable<> -序列化并且排序
public class Bean implements WritableComparable<Bean>{//反序列化@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stub}//序列化@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stub}@Overridepublic int compareTo(Bean bean) {// TODO Auto-generated method stub}
}
combiner编程
- 每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量
- combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能
- 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度
- 注意:combiner的输出是reduce的输入,如果combiner是可插拔的,添加combiner绝对不能改变最终的计算结果。所以combiner只应该用于那种reduce的输入kry/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。。。
- combiner就是map端的educer
- job.setCombinerClass();