从计算机诞生到现今,积累了海量的数据,这些海量的数据有结构化、半结构化、非
结构的数据,并且这些海量的数据存储和检索就成为了一大问题。
我们都知道大数据技术难题在于一个数据复杂性、数据量、大规模的数据计算。
Hadoop就是为了解决这些问题而出现的。
Doug Cutting是Lucene的作者,当时Lucene面临和谷歌同样的问题,就是海量的数据存储和检索,于是就诞生了Nutch。
在这之后,谷歌的大牛就为解决这个问题发了三篇论文(GFS、Map-Reduce、BigTable),这三篇论文总体表达的意思就是部署多台廉价的服务器集群,通过分布式的方式将海量数据存储在这个集群上,然后利用集群上的所有机器进行数据计算,这样谷歌就不用买很多很贵的服务器,只需要把普通的机器组合在一起。
Doug Cutting等人就去研究这三篇论文,发现价值巨大,于是Doug Cutting等人在Nutch上实现了GFS和Map-Reduce,使得Nutch的性能飙升。
于是Doug Cutting等人就把这两部分纳入到Hadoop项目中,主要还是为了将Hadoop项目作为一个大数据整体化的解决方案。
所以为什么后面就出现了Hadoop而不是在Nutch上去做整体化大数据解决方案。
这三篇论文对应Hadoop的组件:
GFS -> HDFS 文件系统
Map-Reduce -> MR 计算框架
BigTable -> Hbase 数据库系统
Hadoop是Apache下的一个分布式系统基础架构,主要是为了解决海量数据存储和海量的数据计算问题。
在这个基础之上发展出了的更多的技术,使得Hadoop称为大数据技术生态圈之一。
1、Apache版本最原始的版本
2、Clodera版本,在大型互联网企业中用的比较多,软件免费,通过服务收费。
3、Hortonworks文档比较好
高可靠:维护多个副本,假设计算元素和存储出现故障时,可以对失败节点重新分布处理
高扩展:在集群间分配任务数据,可方便的扩展数以千计的节点
高效性:并行工作
高容错:自动保存多个副本,并且能够对失败任务重新分配
NameNode(nn):存储文件的元数据,如:文件名、文件目录结构等信息
DataNode(dn):在文件系统存储文件块数据,以及数据的校验和,也就是真正存储文件内容的,只是文件大的时候会切割成一小块一小块的。
SecondayNameNode(2nn):用于监控HDFS状态的辅助后台程序,每隔一段时间就获取HDFS的快照,就是备份和监控状态
ResourceManager(rm):处理客户端请求、启动和监控MRAppMaster、监控NodeManager,以及资源分配和调度。
NodeManager(nn):单个节点上的资源管理、处理来自ResourceManager的命令,处理来自MRAppMaster的命令。
MRAppMaster:数据切分、为应用程序申请资源,并分配内部任务、任务监控和容错。
Container:对任务运行环境的抽象,封装了CPU、内存等多维资源以及环境变量、启动命令等任务运行相关信息(hadoop内部文件操作命令和Liunx差不多)
Map阶段:并行处理数据
Reduce阶段:对Map阶段处理的结果数据进行汇总
Common:支持其他模块的工具模块。
有一个建筑工地的建造时间很紧急,设立了一个支持小组,支援各个小分队(Common),首先1000包水泥,这些水泥要进行存储(HDFS),假设这些水泥有防水的和不防水的,防水的水泥存到仓库1(HDFS-dn),不防水的存储到仓库2(HDFS-dn),那么就要进行记录,哪些水泥存放到哪里了(HDFS-nn),因为赶工期担心水泥可能会因为潮湿那些问题,出现不可用,所以又准备了1000包水泥,并且每天都要对这些水泥进行检查(HDFS-2nn)。
如果一个小分队要领取水泥就要和工地仓储管理人员申请,仓储管理人员同意了,就要向公司申请人员来搬水泥(Yarn-MRAppMaster),开始调动这些人员搬运水泥(Yarn-rm),小分队领取到了水泥之后,开始决定给修外墙的多少包水泥(Yarn-nm)。
修外墙小组就开始拿着水泥干活了(MapReduce-Map),直到整栋楼的外墙修好了(MapReduce-Reduce),第N栋也是如此(MapReduce-Map)。
数据存放在Hadoop,那么Hadoop必然需要对数据进行管理,如果没有一个专门管理数据存储的组件或数据运算的组件,全部都融合在一个东西里面就会显得很臃肿,并且组件之间只需要通过接口进行沟通,那么各自的组件就可以仅仅自身的需求做优化等,那么就不会影响到其他的组件。
各自的组件只需要做好自己的事情,对外提供接口接收相应的数据及返回数据,只要符合我组件规范的就运行,不符合就不运行,而不需要关心其他,专心做自己的事情,也可以使得组件之间可以单独的运行。
bin:程序级命令(hdfs、Yarn等)
etc:配置文件
include:类库等文件
lib:类库等文件
libexec:类库等文件
sbin:hadoop系统命令(关闭、启动等)
share:官方提供的案例等
本地模式:不需要启动单独进程,直接运行,一般测试和开发使用,一台机器就可以运行,如果是在Liunx,跑的是本地,可以直接通过命令运行相应的jar包。
伪分布式模式:等同于分布式,但只有一个节点,具有集群的配置信息和运行,由于伪分布式只有一台机器,可以不启动Yarn,那么也就算是Hadoop的HDFS启动了,直接运行MapReduce程序的话,结果都在HDFS上,不在是在本地,如果需要交由YARN上进行资源调度和分配任务,则需要配置Yarn地址,以及指定数据获取方式。
完全分布式模式:多个节点一起运行,可以指定不同节点干不同的活,比如机器1干NameNode的活,机器2干ResourceManger的活。
注意:启动NameNode时,DataNode会记录NameNode信息(id),当缓存的NameNode记录删除了,这个时候启动就会报错,这个时候就需要将NameNode格式化(删除所有数据),之后在重新启动。
HDFS就是一个分布式文件存储系统,通过目录树来定位文件,由于分布式特点那么集群中的服务器就有各自的角色。
低成本:由于是众多服务器组成的,那么在某服务器挂了,只需要付出一台廉价的服务器。
高容错性:HDFS是由众多服务器实现的分布式存储,每个文件都会有冗余备份,那么如果存储数据的某个服务器挂了,那么还有备份的数据,允许服务器出现故障。
高吞吐量:HDFS是一次写多次读的访问模型,不允许修改文件,并简化了数据的一致性问题。
就近原则:在数据附近执行程序,也体现出来移动计算比移动数据效率高。
可移植性:HDFS可以实现不同平台之间的移植。
一次写入,多次读取,且不支持文件的修改。
适合数据分析场景,不适合网盘应用。
HDFS的文件在物理上是分块存储的,1.x版本的数据块默认大小是64MB,2.x版本的数据块默认块大小是128MB,这个值是可以通过配置参数(dfs.blocksize)进行调整的。
HDFS的块比磁盘的块大,目的就在于要减少寻址的开销(标准:寻址时间只占传输时间的1%),如果块设置的够大,从磁盘传输数据的时间明显就大于定位这个块开始位置所需要的文件,因此传输一个由多个块组成的文件的时间取决于磁盘传输速率。
基本命令:hadoop fs
查看帮助:hadoop fs 或 hadoop fs -help(详情)
创建目录:hadoop fs -mkdir /usr
查看目录信息:hadoop fs -ls /usr
本地剪切,粘贴到集群:hadoop fs -moveFromLocal test.txt /usr/
追加一个文件到已存在文件的末尾:hadoop fs -appendToFile test2.txt /usr/test.txt
显示文件内容:hadoop fs -cat /usr/test.txt
显示一个文件末尾:hadoop fs -tail /usr/ test.txt
以字符形式打印一个文件内容:hadoop fs -text /usr/test.txt
修改文件所属权限(-chgrp、-chomd、chown)(liunx一样用法): hadoop fs -chmod 777 /usr/test.txt
从本地复制到hdfs:hadoop fs -copyFormLocal text.txt /usr/test
hdfs复制到本地:hadoop fs -copyToLocal /usr/ text.txt ./
从hdfs路径拷贝到hdfs另一个路径:hadoop fs -cp /usr/dir1 /usr/dir2
在hdfs目录中移动文件:hadoop fs -mv /usr/test.txt /usr/dir
从hdfs下载文件到本地:hadoop fs -get /usr/test.txt ./
合并下载多个文件:hadoop fs -getmerge /usr /*.txt ./result.txt
上传文件等于copyFormLocal:hadoop fs -put test.txt /usr
删除文件或文件夹:hadoop fs -rmr /usr/test.txt
删除空目录:hadoop fs -rmdir /usr/test3
统计文件系统可用空间信息(-h格式化信息):hadoop fs -df -h
统计文件夹大小信息:hadoop fs -du -h /
统计制定目录下的文件节点数据量(嵌套级,当前文件个数,大小):hadoop fs -count -h /usr
设置文件的副本数:hadoop fs -setrep 3 /usr/test.txt
1、第一次启动namenode格式化后,创建fsimage和edits文件,如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
2、客户端对元数据进行操作请求
3、NameNode记录操作日志,更新滚动日志。
4、NameNode在内存中对数据进行操作
1、Secondary NameNode询问NameNode是否需要checkpoint,直接带回NameNode检查结果。
2、Secondary NameNode请求执行checkpoint
3、NameNode滚动正在写的eits日志
4、将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode
5、Secondary NameNode加载编辑日志和镜像文件到内存并且合并
6、生成新的镜像文件fsimage.chkpoint
7、拷贝fsimage.chkpoint到NameNode
8、NameNode将fsimage.chkpoint重命名为fsimage
Fsimage文件:HDFS文件系统元数据的一个永久检查点,其中包含HDFS文件系统所有目录和文件,以及node序列化信息。
Edits文件:存放HDFS文件系统的所有更新操作,文件系统客户端执行的所有写操作日志都会记录到edits文件。
Secondary NameNode:在主NameNode挂了,可以从Secondary NameNode中恢复数据,但是由于同步的条件限制,会出现数据不一致。
NameNode启动时,受限将镜像文件加载进去内存,并编辑日志文件中的各项操作,一旦内存中成功建立文件系统元数据镜像,则创建一个新的fsimage文件和一个空的编辑日志。
此时的NameNode开始监听DataNode请求,但此刻,NameNode是运行在安全模式,则此时NameNode文件系统对于客户端来说是只可读。
系统中数据块文件并不是由NameNode维护的,而是以块列表的形式存储在DataNode,在系统正常操作期间,NameNode会在内存中保留所有块位置影像信息。
在安全模式下,各个DataNode会向NameNode发送最新的块列表信息,NameNode了解到足够多的块信息之后,即可高效运行文件系统。
如果满足最小复本条件,NameNode会在30秒后就退出安全模式,最小复本条件指的是整个文件系统中99%的块都满足最小复本级别,在启动一个刚刚格式化的HDFS集群时,因为系统中还没有块,所以NameNode不会进入安全模式。
集群启动完成后自动退出安全模式。
银行对账、维护。
在海量数据的处理中,节点之间的数据传输速率是很重要,特别是在带宽很稀缺的情况下,而节点和节点之间的距离越远,那么必然会影响数据的传输。
在成千的服务器集群中,Hadoop是怎么选择副本节点呢?
第一个副本在客户端所处的节点上,但是如果客户端是在集群外,随机选取一个节点
第二个副本和第一个副本位于不同机架的随机节点上,也就是不和第一个副本在相同机架。
第三个副本和第二个副本位于相同机架,节点随机
第一个副本在客户端所处节点上。如果客户端在集群外,随机选一个
第二个副本和第一个副本位于相同机架,随机节点
第三个副本位于不同机架,节点随机
每个文件均按照块存储,每个块的元数据存储在NamNode的内存中(一个文件/目录/文件块一般占有150字节的元数据内存空间),因此Hadoop存储小文件会非常低效,因为大量小文件会耗尽NameNode中大部分内存,但存储小文件所需要的磁盘容量和存储这些文件原始内容所需要的磁盘空间相比也不会增多。
例如:上传一个文件1MB,那么这个文件会在HDFS中的一个块存储着,这个块默认是128MB,那么是不是占用了128MB的磁盘空间呢?
每一个块128MB只是HDFS的逻辑上的划分,所以在磁盘占用空间还是1MB,只有当一个或多个文件在一个块内超过128MB,之后将这个文件进行切割。
HDFS是先把当前这个节点处理完,在去处理副本节点的。
回收站默认是不启用的,在core-site.xml文件中的配置fs.trash.interval默认是为0.
MapReduce是一个分布式运算程序的编程框架,是用户开发基于Hadoop的数据分析应用的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发的运行在一个Hadoop集群上。
由于硬件资源限制,海量数据无法在单机上处理,单机版程序扩展到集群进行分布式运算,增加程序的复杂度和开发难度。
MapReduce框架就是要使得开发人员开源将绝大部分工作集中在业务逻辑的开发上,而分布式运算的复杂性交由MapReduce来处理。
适合数据复杂度运算
不适合算法复杂度运算
不适合实时计算、流式计算
分布式的运算程序最少需要分成两个阶段:
第一个阶段:MapTask并发实例,完全并行运行,互不相干
第二个阶段:ReduceTask并发实例,互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
MapReduce编程模型只能包含一个Map阶段和Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
Map:并行处理任务(运算)。
Reduce:等待相关的所有Map处理完任务,在将任务数据汇总输出。
MRAppMaster:负责整个程序的过程调度和状态协调。
一个完整的MapReduce程序在分布式允许时有三类实例进程:
MRAppMaster:负责整个程序的过程调度和状态协调。
MapTask:负责Map阶段的整个数据处理流程。
ReduceTask:负责Reduce阶段的整个数据处理流程。
序列化就是把内存中的对象转换成字节序列(或其他数据传输协议),以便于存储(持久化)和网络传输。
而序列化就是Map到Reducer的桥梁。
Java序列化是一个重量级的序列化框架(Serializable),使用这个框架进行序列化后会附带很多额外信息(各种校验信息、header等),不便于网络传输,所以Hadoop自己开发了一套序列化机制(Writable),精确、高效。
Java类型 |
Hadoop Writable类型 |
boolean |
BooleanWritable |
byte |
ByteWritable |
int |
IntWritable |
float |
FloatWritable |
long |
LongWritable |
double |
DoubleWritable |
string |
Text |
map |
MapWritable |
array |
ArrayWritable |
备注:自定义的反序列类中的write方法和read方法中DataOutput和DataInput这两个类所提供的方法中,对应Java类型String的方法,分别是writeUTF()和readUTF()。
1、MapReduce程序读取输入目录存放的相应文件。
2、客户端在submit方法执行之前,获取到待处理的数据信息,让后根据急群众参数配置形成一个任务分配规划。
1、建立连接
2、创建提交任务的代理(本地:LocalRunner、远程:YarnRunner)
3、创建给集群提交数据的stag路径
4、获取到任务id,并创建任务路径
5、获取到任务jar包,拷贝jar包到集群(这个jar就是程序运行的业务代码)
6、计算切片,生成切片规划文件
computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize=128MB
7、提交任务,返回提交状态
3、客户端提交job.split、jar包、job.xml等文件给Yarn,Yarn中的resourcemanager启动MRAppMater。
4、MRAppMater启动后根据job的描述信息,计算出需要的MapTask实例数量,然后向集群申请机器,启动相应数量的Map Task进程。
5、MapTask利用客户指定的InputFormat来读取数据,形成KV对。
6、MapTask将输入KV对传递给客户定义的map()方法,做逻辑运算。
7、map()运算完毕后将运算结果的KV对,手机到MapTask缓存。
8、MapTask缓存中的KV对按照K分区排序后不断写到磁盘文件。
9、MRAppMaster监控到所有MapTask进程任务完成后,会根据用户指定的参数启动相应数量的ReduceTask进程,并告知ReduceTask进程要处理的数据分区。
10、ReduceTask进程启动后,根据MRAppMaster告知待处理数据所在位置,从N台MapTask运行所在的机器上获取到N个MapTask输出结果文件,并在本地运行重新归并排序,按照相同Key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。
11、ReduceTask运算完毕后,调用客户指定的OuputFormat将结果数据输出(文件)到外部存储。
说明:
切片是逻辑上的切片
规划文件就是里面描述了切多少个片,每个片是怎么样的。
MapTask的并行任务是否越多越好?并行度是如何决定的?MapTask到底开多少个合适?
1、一个job的map()阶段并行度(MapTask开几个),由客户端在提交job时决定。
2、每一个Split切片分配一个MapTask并行实例处理。
3、默认情况下切片大小=块大小(blocksize)
4、切片时不考虑数据集整体,而是针对每一个文件单独切片(这个是逻辑上的划分)
1、获取到数据存储目录
2、找到要便利处理目录下的每一个文件
3、读取第一个文件test.txt(257MB)
1、获取文件大小
2、计算分片大小,每次切片时,都要判断剩下的部分是否大于块大小的1.1倍,大于就在划分一个块切片
切片:
第一块:128MB
第二块:129MB / 128MB = 1.0078125
1.0078125 < 1.1 = 不在切片,反之继续切
源码:computeSliteSize(Math.max(minSize,Math.max(naxSize,blocksize)));
3、将切片信息写到一个切片规划文件(说明文件)中
4、整个切片的核心过程在于getSplit()方法(看submit()源码)中完成,数据切片只是逻辑上对输入数据进行切片,并不会在磁盘上,将文件切分进行存储。
InputSplit只是记录了分片的元数据信息。比如:起始位置、长度、所在的节点列表等。
注意:块是HDFS上物理存储的数据,切片只是逻辑上的划分。
5、提交切片规划文件(说明文件)到Yarn上,Yarn上的MrAppMaster就根据切片规划文件(说明文件)计算开启的MapTask个数(多少个切片就多少个MapTask)。
1、简单按照文件内容长度切片
2、切片大小,默认是块大小
3、切片时不考虑数据集整体性,而是逐个文件的单独切片,循环遍历每一个文件。
MaxSize(切片最大值):如果比块大小还小,则会让切片变小。
MinSize(切片最小值):如果比块大小还大,则会让切片变得比块还大。
假设:块大小128MB
MaxSize设为100MB
切片后的存储占块大小100MB
如果有大量的小文件,而每一个文件都是一个单独的切片,都会各自交给一个MapTask处理,那么需要开启大量的MapTask,则会产生大量的MapTask,导致处理效率低下。
1、在数据处理前端,先把小文件合并成大文件,在上传到HDFS做后续分析
2、如果已经有大量的小文件存在HDFS,使用CombineFileInputFormat进行处理,CombineFileInputFormat的切片逻辑跟TextFileInputFormat不同,他可以将多个小文件逻辑上规划到一个切片中,这样多个小文件就可以交给一个MapTask。
3、优先满足最小切片大小,不超过最大切片大小的前提下。
备注:在运行日志中查找number of就可以看到了
1、在MapReduce中,Map阶段处理的数据如何传递给Reduce阶段额,是MapReduce框架中关机的一个流程,这个流程就叫Shuffle。
2、Shuffle(洗牌、发牌):核心就是数据分区、排序、缓存
3、MapTaks输出处理结果数据,分发给ReduceTask并在分发过程中对数据按照Key进行分区和排序。
Shuffle是MapReduce处理流程中一个过程,每一个步骤都是分散在各个MapTask和ReduceTask节点上。
MapReduce详细运行流程图就是一个流水线一般的作业,从左向右过去,而在开发的过程中,需要使用到什么组件,这些组件会起到什么作用,在哪一个时间起作用,都可以在这个图中详细的描述
reduce数量小于分区数量就会报错。
reduce数量是1,那么则所有结果输出到一个文件内,即便配置了分区也不会去跑分区的代码(执行分区)
reduce数量大于分区数量,输出的其他文件为空
分区数量 = reduce数量,按照分区数量输出结果文件数量
分区就是对map的结果数据进行二次处理,从而再去决定是否影响输出的reduce结果输出。
MapTask和ReduceTask均会对数据(按Key排序)进行排序,这个操作属于Hadoop默认行为,任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
对于MapTask,他会把处理的结果暂时放到一个缓冲区,当缓冲区使用率达到了阈值就对缓冲区的数据进行排序,并将这些有序的数据写到磁盘上,而当数据处理完后,他会对磁盘上所有文件进行一次合并,将这些文件合并成一个有序的文件。
对于ReduceTask,他从每个MapTask远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中,如果磁盘上文件数目达到一定阈值,则进行一次合并,生成一个更大的文件,如果内存文件大小或数目超过阈值,则进行合并后将数据写出到磁盘上,当所有的数据拷贝完毕后,再统一的对内存核磁盘上的所有文件进行一次合并。
自定义排序
Shullt规定Key是要进行排序的,如果作为Key是必须要实现WritableComparable接口的。
ReducrTask是接收总的MapTask结果,Combiner在每一个MapTask运行的,对每每个MapTask的结果汇总(局部汇总),将MapTask汇总后之后进行压缩传输,可以减少网络传输量。
但是Combiner的前提是不能影响到最终的业务逻辑,如果是累加求和是没有问题的,如果是求平均值就有问题的。
如:
1、在每一个MapTask进行求平均值之后在ReduceTask再求一次平均值,结果是不一样的。
2、将MapTask的数据全部汇总到ReduceTask之后再求平均值。
这两种结果是不一样的。
就是对分区排序好的数据,在进行一次合并分类开来,再一次合并的话,就有个比较标识,如果两个数据标识是一样的,就认为是一组数据,最后过滤去重,最终得到有哪些组。
对小文件的输入进行合并处理。
1、设置文件不可切割
2、读取到整个文件,并且整个文件的数据作为value输出给MapTask(将分片传进去去读取后,将读取到的所有分片数据合并给到MapTask)
3、MapTask在对合并后的数据做操作
获取到ReduceTask的运行结果,自定义要输出的结果数据和文件
Hadoop为每一个作业维护了若干个内置计算器,以描述多项指标,例如:某些计数器记录已处理的字节数等。
说明:计数器的结果在程序运行后的控制台日志中可查看
HDFS根据配置在各个节点存储数据,并且存储相应的副本数据。
MapReduce就是在需要执行无论是MapTask或ReduceTask的时候,会先去ResouceManager去询问,任务要在哪里运行,其实ResourceManger就是看要运行这个任务的输入数据在哪个节点,从而去告知这个节点执行任务,那么就形成了直接移动计算,而不是移动数据的方式。
因为数据可能存储在服务器1或服务器2…服务器,那么不需要移动数据,负责执行任务的服务器,到指定的路径,下载要运算的任务jar包,直接在本地运行,那么当数据非常大的时候就不用去移动数据。
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,在之前说了,可以配置MapReduce在Yarn之上运行,所以MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
1、不需要知道用户提交的程序运行机制,只要符合Yarn规范的资源请求机制即可使用Yarn,Spark、Storm等运算框架都可以整合在Yarn上运行,意味着与用户程序完全解耦。
2、只提供运算资源的调度,程序向Yarn申请资源,Yarn负责分配资源
3、Yarn总的资源调度是ResourceManager,提供运算资源的角色叫NodeManager。
4、Yarn作为一个通用的资源调度平台,企业以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享。
1、客户端将MapReduce程序提交到客户端所在的节点。
2、YarnRunner就向RsourceManager申请一个Application。
3、RsourceManager内部运行一下,看看哪个节点离提交申请节点近,以及系统资源等,内部运行完了,就将应用程序资源路径返回给YarnRunner。
4、程序就将运行程序所需要的资源提交到HDFS上。
5、程序资源提交完后,申请运行MRAppMaster。
6、RsourceManager将用户请求转化为一个task(任务),并寻找最适合的NodeManager,并将任务分配给这个NodeManager。
7、NodeManager领取到任务后,创建容器(Container),并产生MRAppMaster。
8、MRAppMaster向RsourceManager申请运行N个MapTask容器(切片文件中有说明)。
9、RsourceManager又寻找了一下,将MapTask分配给另外两个NodeManager,这两个NodeManager领取到任务,并且创建容器(Container)。
10、RsourceManager告知申请运行MapTask容器的NodeManger,向那两个接受到任务的NodeManager发送程序启动脚本,这两个NodeManger就分别启动MapTask,MapTask对数据进行分区排序。
11、MRAppMaster看程序都跑完了,赶紧申请2个容器,运行ReduceTask。
12、ReduceTask的容器向MapTask容器获取相应分区的数据,并执行任务。
13、程序运行完毕后,MapResource会向RsourceManager注销自己。
1、三台机器
2、ssh
3、防火墙
hadoop-env.sh
yarn-env.sh
mapred-env.sh
<!-- 指定HDFS中NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop-senior00-levi.com:8082</value>
</property>
<!-- 指定hadoop运行时产生文件的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-2.5.0-cdh5.3.6/data/tmp</value>
</property>
<!-- Site specific YARN configuration properties -->
<!-- reducer获取数据的方式 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 指定YARN的ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop-senior01-levi.com</value>
</property>
<!-- 任务历史服务 -->
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop-senior00-levi.com:19888/jobhistory/logs/</value>
</property>
<!-- 指定seconddaryNameNode地址,主要这个是避免NameNode挂了 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop-senior02-levi.com:50090</value>
</property>
<!-- 指定name.dir,默认就是,但是避免未启用,设置一下 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>/opt/module/hadoop-2.5.0-cdh5.3.6/data/tmp/name</value>
</property>
<!-- 指定mr运行在yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- 配置 MapReduce JobHistory Server 地址 ,默认端口10020 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop-senior00-levi.com:10020</value>
</property>
<!-- 配置 MapReduce JobHistory Server web ui 地址, 默认端口19888 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop-senior00-levi.com:19888</value>
</property>
hadoop-senior00-levi.com
hadoop-senior01-levi.com
hadoop-senior02-levi.com
我们都知道NameNode是存储了所有数据的路径,在Hadoop第一个版本是没有HA,单台的NameNode节点挂了,那么整个数据就没办法访问了,
那个的工程师就自己写一个脚本去解决这个问题,定时的拷贝NameNode的fsimage和edits到别的服务器,但是数据量大的时候,拷贝就很慢了,而且工程师半夜正在和周公下棋的时候,NameNode挂了,那就很尴尬了。
虽然可以到第二天早上来恢复,但是数据量那么大的时候,太慢了,满足不了需求。
所以Hadoop为了解决这个问题,在后面的版本继承了HA(高可用)。
Hadoop-HA(高可用)就是在一台服务器挂了,第二台服务器可以马上顶上去。
两个基本问题:
1、第一台服务器和第二台服务器的数据必须要同步。
Hadoop-HA通过edits-log的变化,来将数据写入到JournalNode节点里面去,以分享给其他的NameNode。
2、要解决第一台和第二台服务器同时启用的情况,在这种情况下,子节点怎么提交数据,会提交到两台服务器,但是又会出现抢占资源的情况,(给一个人送东西和给两个人送东西所耗费的体力是不言而喻的),
这个问题在Hadoop-HA中称为脑裂,借助第三方框架(Zookeeper)实现隔离机制来解决脑裂这个问题。
Hadoop详解 - HDFS - MapReduce - YARN - HA
原文:https://www.cnblogs.com/-levi/p/11380644.html