参考:
一、MapReduce数据处理流程
关于上图,可以做出以下逐步分析:
二、MapReduce在Hadoop上的具体实现
这个实现机制就是MapReduce1,在Hadoop2.x的时候实现机制变成了YARN。了解MapReduce1对于我们理解Hadoop非常有帮助,晚些时候会写一篇专门关于YARN的文章。
如果细看,可以发现,MapReduce1实现图其实与一开始的MapReduce的工作流程总体是一致的,只不过多了JobTracker、TaskTracker以及Client这几个角色。map和reduce任务分配给了多个TaskTracker来执行。这几个角色非常重要,有必要详细了解。
客户端(client):这个是程序员主要工作的部分,工作分别是编写mapreduce程序,配置相应的文件信息,提交作业。如果出现错误了,需要找出错误,修改程序,直到完美运行。
JobTracker与TaskTracker之间服从的是主从结构。从图中可以看到:主节点JobTracker只有一个,而从节点TaskTracker有很多个。
JobTracker与TaskTrackers之间的关系就是项目经理与开发人员的关系。项目经理接到用户的需求清单,那么将用户的需求分配给开发人员来完成。
三、具体实现机制
了解了如何从MapReduce迁移到JobTracker TaskTrackers之后,我们来详细讲讲其中的实现机制(下面讲到的每一点都对应图上的相应数字):
1.写好的一个MapReduce程序就是一个job。点击运行。此时会生成一个JobClient,它会做一系列的准备工作,当准备工作做好了之后,才会向JobTracker提交任务。
2.JobClient向JobTracker请求一个新的job ID。与此同时,JobClient会先做如下检查:
3.JobClient将运行作业所需要的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到JobTracker的文件系统中以job ID命名的目录下(即HDFS中)。值得注意的是,作业jar副本较多(默认mapred.submit.replication = 10)。
4.上面的准备工作做好了之后,它会给JobTracker提交任务(它会告知JobTracker:大哥,我们这边准备好了,随时可以战斗。哎呀呀,逗比了)。
5.JobTracker接收到作业提交信息后,将其放入内部队列,交由job scheduler进行调度,并对其进行初始化(初始化就是创建一个正在运行的job对象(封装任务和记录信息),以便JobTracker跟踪job的状态和进程)。
6.初始化完毕后,作业调度器会获取输入分片信息(input split),每个分片创建一个map任务。关于分片的数量问题(即map数量),前面已经有提及。关于reduce数量,则是由用户在配置文件里指定的。
除了map和reduce任务,还有setupJob和cleanupJob需要建立:由每一个TaskTrackers在所有map开始前和所有reduce结束后分别执行。setupJob()创建输出目录和任务的临时工作目录,cleanupJob()删除临时工作目录。
7.每个TaskTracker定期发送心跳给JobTracker,告知自己还活着,并附带消息说明自己是否已准备好接受新任务。JobTracker以此来分配任务,并使用心跳的返回值与TaskTracker通信。JobTracker利用调度算法先选择一个job然后再选此job的一个task分配给TaskTracker.
每个TaskTracker会有固定数量的map和reduce任务槽,数量有TaskTracker核的数量和内存大小来决定。JobTracker会先将TaskTracker的所有的map槽填满,然后才填此TaskTracker的reduce任务槽。
JobTracker分配map任务时会选取与输入分片最近的TaskTracker,即数据TaskTracker优化。在分配reduce任务用不着考虑数据TaskTracker。
8.TaskTracker分配到一个任务后,首先从HDFS中把作业的jar文件及运行所需要的全部文件(DistributedCache设置的)复制到TaskTracker**本地**。接下来TaskTracker为任务新建一个本地工作目录,并把jar文件的内容解压到这个文件夹下(此时需要用到的就是前面提及的setupJob(),其作用是创建输出目录和任务的临时工作目录)。
9.TaskTracker新建一个taskRunner实例来运行该任务。
10.TaskRunner启动一个新的JVM来运行每个任务。
此时图中显示的所有动作都已经写下来了。只不过还有一些细节需要把握。请看下面:
Child JVM有独立的线程每隔3秒检查任务更新标志,如果有更新就会报告给此TaskTracker;
TaskTracker每隔5秒给JobTracker发心跳;(当然这个时间可以设置)
job tracker合并这些更新,产生一个表明所有运行作业及其任务状态的全局视图。
JobClient.monitorAndPrintJob()每秒查询这些信息。
当JobTracker收到最后一个任务(this will be the special job cleanup task)的完成报告后,便把job状态设置为successful。Job得到完成信息便从waitForCompletion()返回。
最后,JobTracker清空作业的工作状态,并指示TaskTracker也清空作业的工作状态(如删除中间输出)。
分布式计算过程中节点失败是很常见的。作为一个成熟的实现机制,应该有一套完善的失败处理机制。
在Hadoop的MapReduce1架构中常见失败有三种:任务失败、TaskTracker失败、JobTracker失败。
值得注意的是:
1)任务失败有重试机制,重试次数map任务设置是mapred.map.max.attempts属性控制,reduce是mapred.reduce.max.attempts属性控制。
2)一些job可以完成任务总体的一部分就能够接受,这个百分比由mapred.map.failures.precent和mapred.reduce.failures.precent参数控制。
3)任务尝试(task attempt)是可以中止(killed)的。
作业运行期间,TaskTracker会通过心跳机制不断与系统JobTracker通信,如果某个TaskTracker运行缓慢、失败或者出现故障,TaskTracker就会停止或者很少向JobTracker发送心跳,JobTracker会注意到此TaskTracker发送心跳的情况,从而将此TaskTracker从等待任务调度的TaskTracker池中移除。
由于TaskTracker中包含有一定数量的map和reduce子任务,这个时候这些子任务怎么处理呢?
1) 如果是map并且成功完成的话, JobTracker会安排此TaskTracker上一成功运行的map任务返回。
2) 如果是reduce并且成功的话,数据直接使用,因为reduce只要执行完了的就会把输出写到HDFS上。
3) 如果他们属于未完成的作业的话,reduce阶段无法获取该TaskTracker上的本地map输出文件,任何任务都需要重新调度。
另外,即使TaskTracker没有失败,如果其上的失败子任务远远高于集群的平均失败子任务数,也会被列入黑名单。可以通过重启从JobTracker的黑名单移除。
###jobtracker失败
jobtracker失败应该说是最严重的一种失败方式了,而且在Hadoop中存在单点故障的情况下是相当严重的,因为在这种情况下作业会最终失败,尽管这种故障的概率极小。
原文:https://www.cnblogs.com/hugh-tan/p/8973038.html