每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能
如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出
key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
Combiner仅在Map端进行数据归约, Map之间的数据是无法归约的,因此必须使用Reducer
Combiner的适合场景:求和,最大值,最小值等
Combiner的不适合场景:求平均数
举例
假如有1T的数据,对里面的数据求和,这一个T的数据被分成很多Block,再Map端进行读取之后全部送入Reducer端,这样的话Reducer处理的数据>=1T
但是如果再map端进行Combiner合并之后再传到Reducer之后,那么Reducer端处理的数据就很少了,这样就体现了分布式的优势。(相反不用Combiner就根部体现不了分 布式的优势)
Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。
HashPartitioner是mapreduce的默认partitioner。计算方法是
which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。
(例子以jar形式运行)
来看下默认的HashPartitioner
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
注意:这里的getPartition默认的返回值是0,返回值是分区的编号
如果我们没有自定义分区的话,默认就只有一个分区
适合场景:城市的分区,IP地址的分区,电话号码的分区等等
分组跟排序
分组要实现RaoComparator接口
在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。
分组时也是按照k2进行比较的。
Shuffle
1 每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
2 写磁盘前,要partition,sort。如果有combiner,combine排序后数据。
3 等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
2.1 Reducer通过Http方式得到输出文件的分区。
2.2 TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。
2.3 排序阶段合并map输出。然后走Reduce阶段。
只看这个图,不看other maps,other reducers,有4个map任务,3个reducer
Reducer的源码中有Shuffle的定义
MapReduce体系结构及各种算法(2),布布扣,bubuko.com
原文:http://blog.csdn.net/manburen01/article/details/38417907