1、将原始输入的数据分成N份(每份默认为64M),交给N个Map节点任务
2、Map收到分片的数据后调用用户自定义的Map函数,生成多个KeyValue数据对
3、MapReduce提供了Partition接口,Partition会根据Reduce的个数来决定当前当前这个KeyValue数据对应该由哪个Reduce处理。(默认的Partition是采用Key的hash值取Reduce的模)
4、输出的KeyValue数据对写入一个内存缓存区(默认为100M,它的作用是为了减少磁盘的IO,由io.sort.mb属性控制)。
5、当多个输出的KeyValue数据对超过内存缓存区设置的大小的Spill比例(spill.percent这个比例默认为0.8,由io.sort.spill.percent属性控制)时,会执行Spill将内存缓存区的内容写入磁盘。Spill的过程由另一个线程完成,不会影响KeyValue数据对写入内存缓存区。
6、当Spill线程启动后,在写入磁盘前,需要将要写入到磁盘的大量KeyValue数据对进行Sort,如果设置了Combiner,在写入磁盘前将有相同Key的KeyValue对的Value做Combiner(其实也是一个类似Reduce过程),这样做的目的是为了减少Spill写入磁盘的数据量,还可以将文件进行压缩减少网络传输(mapred.compress.map.out设置为true)。注:使用Combiner虽然会提高效率,但是有些操作是会影响最终Reduce的结果的,如:求平均值。
7、每一次Spill都会在Map端生成一个文件,如果Map输出的KeyValue数据对特别大时会生成多个Spill文件,在当前Map节点的执行最后阶段,会将生成的多个Spill文件Merge为一个Spill文件,Merge的过程和Spill的过程实质上一样,也会执行Combiner。Map任务彻底结束。Map节点在发送心跳时将任务完成的信息发送到JobTracker。
8、Reduce阶段先进行拷贝,每个ReduceTask通过RPC从JobTracker中获取Map的完成情况,将Map完成的结果文件中,拷贝Partition分给当前Reduce节点的相关KeyValue数据对到Reduce节点的内存缓存区中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,此属性表示Heap空间的百分比),如果内存缓存区超过比例(由mapred.job.shuffle.merge.percent决定)会写入到Reduce节点的本地磁盘中,同Map阶段类似
9、再进行Merge,从多个Map节点获取的结果信息,不断的做Merge,最终形成ReduceTask的输入文件
10、合并后的Reduce的输入文件,默认是保存在磁盘的,然后执行用户自定义的Reduce函数。
11、由于HDFS是不允许多个线程操作同一文件的,所以多个Reduce节点将会生成多个输出文件
原文:http://chengyanbin.blog.51cto.com/3900113/1388629