l 分表
l 合理利用中间结果集,避免查过就丢的资源浪费,减低Hadoop的IO负载
l 常用复杂或低效函数尽量不用或拆分成其他实现方式,如count(distinct)
l 合理设计表分区,静态分区和动态分区
l 优化时一定要把握整体,单个作业最优不如整个作业最优。
l 文件存储格式和压缩方式
l Hadoop本身的优化
l 有些逻辑,使用系统函数可能比较复杂,可能涉及多层嵌套,建议使用自定义函数实现。
架构层面优化,我这里不做过多介绍了,写HQL时要时常考虑按照map-reduce执行方式来写,平时多注意一下,很多问题都可以避免的。下面的介绍的优化中,或多或少对架构层面的优化都有涉及。
l 合并小文件
Mappers过多情况下:
l Map阶段输出文件太小,产生大量小文件
l 初始化和创建Mappers进程的开销很大
Mappers太少情况下:
l 文件处理或查询并发度小,Job执行时间过长
l 大量作业时,容易堵塞集群
通常情况下,Job会通过输入文件产生一个或多个mapper数,
主要的决定因素有两个:输入的文件数,输入的文件大小。
举例:
a) 假设输入只有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个block(6个128M的block和1个12M的block,dfs.block.size是128M),从而产生7个mappers数。
b) 假设输入有3个文件a、b、c,大小分别为10m,20m,130m,那么hadoop会将其分隔为4个block(10m,20m,128m,2m),从而产生4个mappers数。
注释:以上两种情况均不考虑文件合并的情况。
两种方式控制Mapper数:即减少mapper数和增加mapper数
l 减少mapper数可以通过合并小文件来实现,这点是对文件源处理。
l 增加mapper数可以通过控制上一个job的reducer数来控制(比如:一个sql中多表join会产生多个Map-Reduce任务)。
比如增大mapred.reduce.tasks数值。
下面介绍map端的几个控制参数:
l set mapred.map.tasks=10;
此参数直接设置,有时并不生效,其实它是hadoop的参考数值。
下面我说一下直接设置不生效的原因:
默认mapper个数计算为:
# total_size为输入文件总大小,dfs_block_size为HDFS设置的数据块大小(一般为128MB)
default_mapper_num=total_size/dfs_block_size;
我们通过参数直接设置的期望mapper个数为:
# setmapred.map.tasks=10;
#这个参数设置只有在大于default_mapper_num的时候,才会生效
goal_mapper_num=mapred.map.tasks;
下面我们来计算一下,经过map端split处理的文件大小和个数:
#mapred.min.split.size(数据的最小分割单元大小)
#mapred.min.split.size 设置每个task处理的文件大小,只有在大于dfs_block_size值时才会生效
split_size=max(mapred.min.split.size,dfs_block_size);
split_num=total_size/split_size;
最终计算的mapper个数:
compute_mapper_num=min(split_num,max(default_mapper_num,goal_mapper_num))
总结:
其实根据我自己的实践,调整mapper数之前,我们一定要确定处理的文件大概大小以及文件的存在形式(很多小文件,还是单个大文件以及其他形式),然后合理地调整mapred.min.split.size和mapred.max.split.size的值。
比如,如果想减少mapper个数,则需要增大mapred.min.split.size的值(因为dfs_block_size一般不变)。
示例:
情况1:输入文件很大,但不是小文件组成的
增大mapred.min.split.size的值。
情况2:输入文件数量很多,且都是小文件,同时每个文件都小于dfs_block_size。
这种情况下通过增大mapred.min.split.size不可行。
原因:增大mapred.min.split.size会造成小文件在网络上来回传输,造成网络负载很大。
解决办法:需要设置下面参数,使用合并小文件方法,将多个输入文件合并后送给mapper处理,从而减少mapper的数量。
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
(下个小章节会介绍小文件合并的优化)
l Map端的聚合,减少Reduce处理负担:
sethive.map.aggr=true;
l 推测执行:
set mapred.map.tasks.speculative.execution=true;
(reduce端也有类似的参数:mapred.reduce.tasks.speculative.execution)
所谓的推测执行,就是当所有的task都开始运行之后,Job Tracker会统计所有任务的平均进度,如果某个task所在的task node节点配置比较低或者CPU负载很大,导致任务执行比总体任务的平均执行要慢,此时Job Tracker就会在其他节点启动一个新的任务(duplicatetask),原有任务和新任务哪个先执行完就把其他节点的另外一个任务kill掉。这也是我们经常在Job Tracker页面看到,虽然任务执行成功了,但是发现一些任务被kill掉了,就是这个原因。
l Reducers数过多的情况:
生成了很多个小文件(最终输出文件由reducer决定,一个reducer输出一个文件),那么如果这些小文件作为下一个Job输入,则会出现小文件过多需要进行合并的问题。而且启动和初始化reducer需要耗费时间和资源。
l Reducers数过少:
执行耗时,并且可能出现数据倾斜
l Reducer个数的决定:
默认情况下,Hive分配reducer个数由下列参数决定:
参数1:hive.exec.reducers.bytes.per.reducer(默认为1G)
参数2:hive.exec.reducers.max(默认为999)
计算reducer数的公式:
N=min(参数2,总输入数据量/参数1)
即默认一个reduce处理1G数据量。
注意:与mapred.map.tasks参数不同,如果设置了setmapred.reduce.tasks参数的数值,忽略上述计算,reducer个数可以由mapred.reduce.tasks直接指定。
l 以下情况只有一个reducer:
某些情况下我们会发现任务中不管数据量多大,不管怎么调整reducer相关的的参数,任务中一直都只有一个reducer任务:
1、 除了数据量小于hive.exec.reducers.bytes.per.reducer参数值的情况外
2、 用了group by的汇总
3、 用了order by
l Reduce数决定中间或落地文件数,文件大小和HDFS的block大小无关。
l 使用场景描述:
当某个任务有多个Job时,其中某个Job的结果被后面Job多次引用时,设大该参数,以便增加后面访问的Mapper数。
比如,如果一个Job的输出被另外多个Job调用,假如最前面的Job只生成1G的一个文件,那么后面Job也只会有一个Map来处理,效率明显低很多。
l 推测执行
setmapred.reduce.tasks.speculative.execution =true;
sethive.mapred.reduce.tasks.speculative.execution =true;
可以看到除了Map-Reduce侧提供推测执行参数,hive侧也提供了推测执行的参数。
l Map阶段Hive自动对小文件进行合并
参数控制:
#Map任务结束时就会合并小文件(Map-Only)
set hive.merge.mapfiles=true;
#在Map-Reduce的任务结束时合并小文件
set hive.merge.mapredfiles=true;
#合并文件的大小(256MB)
set hive.merge.size.per.task=256000000;
#每个mapper最大分隔大小(输入大小)
#结合上面块大小(dfs.block.size=128MB),决定拆分几个mapper数
set mapred.max.split.size=256000000;
#一个节点上split至少的大小
set mapred.min.split.size.per.node=100000000;
#执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
在开启了org.apache.hadoop.hive.ql.io.CombineHiveInputFormat之后,一个datanode节点上多个小文件会进行合并,合并文件数由mapred.max.split.size限制的大小决定。
mapred.min.split.size.per.node决定多个datanode上的文件是否需要合并,即多个节点上的文件也可以合并,大小由此决定。
l Job合并输入小文件
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
多个小文件由一个map执行。
合并文件数由mapred.max.split.size限制的大小决定。
l Job合并输出小文件
sethive.merge.smallfiles.avgsize=256000000;
当输出文件平均大小小于该值,启动新job用于合并文件。
对于多个job时,前一个job输出很多大小不均匀的数据文件,对后续的job处理会造成数据倾斜的问题。
如果输出文件大小均匀,则后续处理的mapper数比较合理。
sethive.merge.size.per.task=64000000;
合并之后的文件大小。
案例(那我们自己的开发环境来测试,我们的环境dfs.block.size为64MB):
环境如下(默认配置):
set dfs.block.size=134217728;
sethive.merge.mapfiles=true;
sethive.merge.mapredfiles=false;
sethive.merge.size.per.task=256000000;
setmapred.max.split.size=256000000;
setmapred.min.split.size.per.node=256000000;
setmapred.min.split.size.per.rack=256000000;
sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
我构造的表如下:
hive (annuity_safe)> desc formattedtest_data;
OK
col_name data_type comment
# col_name data_type comment
deptno string
polno string
certno string
brno decimal(2,0)
………………………………
set_of_books_id string
is_return string
pk_serial string
op_month string
# Detailed Table Information
Database: annuity_safe
Owner: hduser0103
CreateTime: Tue Jul 28 11:41:20 CST 2015
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: hdfs://dev-l002781.app.paic.com.cn:9000/user/hive/warehouse/annuity_safe.db/test_data
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE true
numFiles 5
numRows 0
rawDataSize 0
totalSize 279752108
transient_lastDdlTime 1438055637
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.083 seconds, Fetched: 95row(s)
构造的表对应的文件路径下有5个文件,文件大小除了xae文件为11MB,其他都大概为64MB。
hive (annuity_safe)> ! hadoop fs -lsrhdfs://dev-l002781.app.paic.com.cn:9000/user/hive/warehouse/annuity_safe.db/test_data;
-rw-r----- 3hduser0103 hduser0103 671088652015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xaa
-rw-r----- 3hduser0103 hduser0103 671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xab
-rw-r----- 3hduser0103 hduser0103 671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xac
-rw-r----- 3hduser0103 hduser0103 671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xad
-rw-r----- 3hduser0103 hduser0103 113166512015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xae
执行查询(未优化参数情况下):
hive (annuity_safe)> selectdeptno,count(1),min(fcd),max(fcd) from test_data group by deptno;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified.Estimated from input data size: 1
In order to change the average load for areducer (in bytes):
sethive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number ofreducers:
sethive.exec.reducers.max=<number>
In order to set a constant number ofreducers:
setmapred.reduce.tasks=<number>
Starting Job = job_201507191627_38369,Tracking URL =http://dev-l002781.app.paic.com.cn:50030/jobdetails.jsp?user.name=hadoop&jobid=job_201507191627_38369
Kill Command =/appcom/HadoopInstall/hadoop-1.2.1/libexec/../bin/hadoop job -kill job_201507191627_38369
Hadoop job information for Stage-1: number of mappers:4; number of reducers: 1
2015-07-28 13:44:01,643 Stage-1 map =0%, reduce = 0%
2015-07-28 13:44:13,786 Stage-1 map =50%, reduce = 0%, Cumulative CPU 12.15sec
2015-07-28 13:44:14,800 Stage-1 map = 75%, reduce = 0%, Cumulative CPU 13.89 sec
2015-07-28 13:44:37,214 Stage-1 map =100%, reduce = 0%, Cumulative CPU 18.61sec
2015-07-28 13:44:38,228 Stage-1 map =100%, reduce = 33%, Cumulative CPU 18.61sec
2015-07-28 13:44:40,254 Stage-1 map =100%, reduce = 100%, Cumulative CPU22.29 sec
MapReduce Total cumulative CPU time: 22seconds 290 msec
Ended Job = job_201507191627_38369
MapReduce Jobs Launched:
Job 0: Map: 4 Reduce: 1 Cumulative CPU: 22.29 sec HDFSRead: 279753293 HDFS Write: 6927 SUCCESS
Total MapReduce CPU Time Spent: 22 seconds290 msec
OK
deptno _c1 _c2 _c3
-08 01:20:15.0 1 NULL NULL
0000 1 NULL NULL
01000592864 1 G000000000 G000000000
71679 1 G000000000 G000000000
G01 1 2001-07-19 15:46:42.0 2001-07-19 15:46:42.0
G010103 9 2002-09-17 14:08:45.0 2002-09-17 14:10:44.0
G0123 6161 2006-08-16 14:59:06.0 2010-01-18 15:06:14.0
G014205 51893 2010-10-08 01:05:01.0 2010-10-08 01:23:55.0
G014302 4 2011-09-26 19:12:36.0 2011-09-26 19:12:36.0
G02 32 2000-07-18 09:53:52.0 2002-06-10 13:14:24.0
G020105 10 2000-06-20 14:08:23.0 2000-08-08 10:37:10.0
G020301 5 2000-08-15 10:35:00.0 2000-08-21 11:29:07.0
G020302 2 2000-08-17 09:31:00.0 2000-09-08 10:45:39.0
…………………………………………………
可以发现,出现了4个mappers来处理数据文件。
我们查看页面查看4个task的Counters发现,4个task读取的文件字节数为:
67,109,107
201,327,048
11,316,894
244
加起来为268MB左右,但是每个task处理的数据不均匀,其中有一个task处理了约200MB的数据,一个task处理了244字节的数据,便会出现木桶效应。
案例优化:
1、 环境的参数优化:
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
sethive.merge.size.per.task=64000000;
set mapred.min.split.size=64000000;
set mapred.max.split.size=64000000;
setmapred.min.split.size.per.node=64000000;
set mapred.min.split.size.per.rack=64000000;
sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
因为合并小文件默认为true,而dfs.block.size与hive.merge.size.per.task的搭配使得合并后的绝大部分文件都在64MB左右。
我们不用执行上面的查询语句,就大概可以分析如下:
对于上面表对应的5个数据文件:4个为64MB,1个为11MB,那么上面的查询,会有5个mapper,其中4个mapper分布处理64MB的数据,其他1个mapper分布处理11MB的数据。
2、 运行上面的HQL,出现的结果与我们上面分析的一致。
总结:这里我只是提供了一种优化的思路,其实这里还不是最优,我们可以实际的Hadoop环境,hdfs的block大小以及表对应的数据文件,来调整上面的参数(比如将64MB全部修改为128MB,或许更快一点),最终每个task处理的数据大致相同,均衡IO负载,以达到资源最佳使用。
1、 什么是数据倾斜?
Hadoop框架的特性决定最怕数据倾斜。
JobTracker和TaskTracker关系相当于老师和学生的关系,JobTracker分发任务给TaskTracker去处理各自的工作,但是不是平均分配的,还得根据TaskTracker本地的数据量多少去做判断。如果每个节点数据分配不均匀,势必造成有的TaskTracker处理的数据量大,有的处理的数据量小。
由于数据分布不均匀,可能会造成数据大量集中到一个节点或极少数个节点,造成数据热点。
2、 数据倾斜造成的症状:
map阶段快,reduce阶段非常慢,
某些mapper很快,某些mapper很慢,
某些reducer很快,某些reducer奇慢。
3、 数据倾斜可能在如下场景中出现:
A、 数据在节点上分布不均匀(无法避免)
B、 join时on关键词中个别值量很大(如null值)
C、 count(distinct)在数据量大的情况下,容易数据倾斜,因为count(distinct)是按group by字段分组,按distinct字段排序(有时无法避免)。
其中A无法避免,B见后边介绍的Join优化部分,C语法上有时无法避免。
关键词 |
情形 |
后果 |
Join |
其中一个表较小, 但是key集中 |
分发到某一个或几个Reducer上的数据远高于平均值 |
大表与大表,但是分桶的判断字段0值或空值过多 |
这些空值都由一个reducer处理,非常慢 |
|
group by |
group by 维度过小, 某值的数量过多 |
处理某值的reducer灰常耗时 |
Count Distinct |
某特殊值过多 |
处理此特殊值的reducer耗时 |
1、 减少Job数
当源表相同时,如下可以合并Job,从而减少job数:
l Join时,On 字段相同
多表join on条件相同时,合并为一个Map-Reduce。
select pt.page_id,count(t.url) pv
from rpt_page_type pt
join
(select refer_page_id,url_page_id,url from trackinfo where ds = ‘2013-10-11’)t
on pt.page_id = t.url_page_id
join
(select page_id from rpt_page_kpi_new where ds = ‘2013-10-11’)r
on t.url_page_id= r.page_id
group by pt.page_id;
利用这个特性,可以把相同join on条件的放在一个job处理。
l union all
对同一个表的union all只查询一次源表,Hive本身对这种union all做过优化。
selecturl,session_id from
(selecturl,session_id
from trackinfowhere ds=’2013-11-01’
union all
selecturl,session_id
from trackinfowhere ds=’2013-11-02’
)t;
l Multi-insert(Multi-group by一定会和Multi-insert一起使用,同一源表,可按照不同where、不同group by进行计算)
条件:源表相同,上方的SQL等同于:
from trackinfo
insert overwrite table tmp_testpartition(step=1)
select url,session_id where ds=’2013-11-01’
insert overwrite table tmp_test partition(step=2)
selecturl,session_id where ds=’2013-11-02’;
l 并行化执行
每个查询被hive转化成多个阶段,有些阶段关联性不大,则可以并行化执行,减少执行时间。
sethive.exec.parallel=true;
sethive.exec.parallel.thread.number=15;
实例:
select num
from
(selectcount(city) as num
from city
union all
selectcount(province) as num
fromprovince)tmp;
union all两侧的查询语句会同时执行。
l 本地化执行(感觉生产环境用处不大)
sethive.exec.mode.local.auto=true;
当一个job满足如下条件才能真正使用本地模式:
a、 job的输入数据大小必须小于参数:
hive.exec.mode.local.auto.inputbytes.max(默认128MB)
b、 job的map数必须小于参数:
hive.exec.mode.local.auto.tasks.max(默认为4)
c、 job的reducer数必须为0或者1
如果你的环境不满足上述条件时,执行过程会提示原因,如Input size大于hive.exec.mode.local.auto.inputbytes.max值,或input files个数大于hive.exec.mode.local.auto.tasks.max值,同时会取消本地化执行,改为其他方式执行。
l JVM重利用
set mapred.job.reuse.jvm.num.tasks=15;
JVM重利用可以是Job长时间保留slot,直到作业结束,这对于有较多任务和较多小文件的任务是非常有意义的,因为减少了JVM的启动和初始化时间,从而减少执行时间。当然这个值不能设置过大,因为有些作业会有reduce任务,如果reduce任务没有完成,则map任务占用的slot不能释放,其他作业可能就需要等待。
l Hive压缩数据
中间压缩就是处理hive查询的多个job之间的数据,对于中间压缩,最好选择一个节省CPU耗时的压缩方式(即压缩率比较适中)。
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=
org.apache.hadoop.io.compress.SnappyCodec;
sethive.intermediate.compression.type=BLOCK;
Hive查询最终的输出结果文件采用压缩(落地文件的压缩率可以选择较高的压缩率)
set hive.exec.compress.output=true;
set mapred.output.compression.codec=
org.apache.hadoop.io.compress.GzipCodec;
setmapred.output.compression.type=BLOCK;
l Join
l Mapjoin
l Group by
l 笛卡尔积
l Hive表的优化
l 数据按照join的key进行分发,而在join左边的表的数据会首先部分或全部读入内存,如果左边表的key相对分散(单个key值数据量小,或者说相同key的数据量小),读入内存的数据会比较小,join任务执行会比较快,而如果左边的表key比较集中,而这张表的数据量又很大,那么数据倾斜就会比较严重。
Map阶段同一key数据会分发给同一个reducer计算。
l join原则:
1) 小表join大表
在join操作的Reduce阶段(不是map阶段),位于join左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存进出的几率。
解决办法:
多个表关联时,最好分拆成几个小段,避免大sql(无法控制中间job)。
2) 大表join大表
大表关联中,如果join的key中含有大量null,在使用key进行hash分发时,会将数据文件中key为空的数据都分到一个节点,造成了数据倾斜。
解决办法:
把空值的key变成一个字符串加上随机数,把倾斜的数据分发到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
l Join中对join key存在大量空值的优化演示:
end_user_id中存在大量null值。
原始HQL:
select u.id, t.url, t.track_time
from end_user u
join
(select end_user_id,url,track_time from trackinfo where ds= ‘2013-12-01‘)t
on u.id = t.end_user_id limit 2;
优化后HQL为:
select u.id, t.url, t.track_time
from end_user u
join
(select case when end_user_id = ‘null‘ or end_user_id is null
then cast(concat(‘00000000’,floor(rand()*1000000))as bigint)
else end_user_id endend_user_id,
url,track_time
from trackinfo where ds= ‘2013-12-01‘) t
on u.id = t.end_user_id limit 2;
l Join对数据倾斜的参数优化:
set hive.optimize.skewjoin=true;
如果在join过程中出现倾斜,参数值应该设置为true。
set hive.skewjoin.key=1000000;
这个是join的键(key)对应的记录条数超过这个值则会进行join自动优化。
上面两个参数设置后的优化原理是:
没优化之前,join会启动一个job,但是设置优化参数后,会启动两个job。
第一个job会将键(key)超过hive.skewjoin.key记录的键加上一些随机数等,将这些相同的key打乱,然后跑到不同的节点上面进行计算(reduce阶段)。然后再启动一个job,在第一个job处理的基础上(即第一个job的reduce输出结果)再进行计算,将相同的key分发到相同的节点上处理。
l Join时的关联键key的数据类型一定要相同,否则会产生数据倾斜问题
由于test_a表中的id为字符串型,所以我们将test_b表数字类型转换成字符串类型
select a.* fromtest_a a
left outer jointest_b b
On a.id =cast(b.id as string);
l Mapjoin(map端执行join操作):
mapjoin的计算原理:
mapjoin会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配。
l Join 操作在Map阶段完成,如果需要的数据在Map的过程中可以处理掉,则不再需要Reduce阶段,加快了执行效率。
小表关联一个超大表时,容易发生数据倾斜,可以用Mapjoin把小表全部加载到内存,并在map端进行join操作,避免reducer处理。
如:
insert overwrite table page_pv
select /*+ MapJoin(pt)*/
pt.page_id,count(t.url) pv
from rpt_page_type pt
join
(select url_page_id, url from trackinfo where ds = ‘2013-10-11‘) t
on pt.page_id = t.url_page_id;
l mapjoin的使用场景:
1) 关联操作中有一张表非常小
2) 不等值的连接操作
l Mapjoin两种使用方式:
1) 通过参数设置,Hive自动选择执行Mapjoin操作
hive.auto.convert.join=true;
hive.mapjoin.smalltable.filesize=25000000;-------默认为25MB
原理:将小于hive.mapjoin.smalltable.filesize数值的表加载到分布式缓存中,这样整个集群节点上map端任务都可以访问缓存中的数据。
2) 另外一种方式,可以不设置参数,通过hint方式指定:
select /*+ mapjoin(test_b) */a.key,a.value fromtest_a a join test_b b on a.key = b.key;
l Mapjoin其他参数设置
set hive.mapjoin.cache.numrows=25000;
说明:mapjoin存在内存里的数据量。
set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.55;
说明:map join做group by操作时,可以使用多大的内存来存储数据,如果数据太大,则不会保存在内存里。
set hive.mapjoin.localtask.max.memory.usage=0.90;
说明:本地任务可以使用内存的百分比
l 使用bucket join需要满足下面两个条件
(1) 两个表以相同方式(key)划分桶
(2) 两个表的桶个数是倍数关系
create table order(cidint,price decimal(18,2)) clustered by (cid) into 32 buckets;
create tablecustomer(id int,first string,last string) clustered by (id) into 32(or 64……)buckets;
select pricefrom order o join customer c on o.cid = c.id;
说明:
查询语法与普通表一样,但是底层执行却不一样。根据key只会查找对应的桶即可,比如:如果cid=1,那么只会从customer中查找id=1的数据,这些数据都位于一个桶中,所以只需访问一个桶即可。
l Map端部分聚合:
并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。
l Map端部分聚合参数:
#是否在map端进行聚合
hive.map.aggr=true;
l 有数据倾斜的时候进行负载均衡
#如果group by过程中出现倾斜,应该设置为true
hive.groupby.skewindata=true;
#在map端进行聚合操作的条目数目
#这个是group by的键对应的记录条数超过这个值则会进行优化
hive.groupby.mapaggr.checkinterval=100000;
和mapjoin类似,group by优化后也会启动两个Job。
当选项设为true时,生成的查询计划会有两个MR job,第一个MR job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group by key有可能被分发到不同的Reduce中,从而达到负载均衡的目的。
第二个MR job再根据预处理的数据结果按照group bykey分布到Reduce中(这个过程可以保证相同的group by 被分发到同一个Reduce中),最后完成最终的聚合操作。
l 当该字段存在大量值为null或空的记录时容易造成倾斜。
解决思路:
1) count(distinct)时,将值为空的数据在where里过滤掉,在最后结果中加1。
2) 如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
3) 如果group by维度过小,则可以 采用count和group by的方式来替换count(distinct)完成计算
l 特殊情况特殊处理:
在业务逻辑优化效果不大情况下,有些时候是可以将倾斜的数据单独拿出来处理,最后union回去。
l countdistinct优化:
实例1:
优化前:
selectcount(distinct id) from student;
只有一个job任务,而且只有一个reduce,处理的工作量比较大。
优化后:
selectcount(1) from (select distinct id from student) tmp;
或
selectcount(1) from (select id from student group by id) tmp;
可以通过设置set mapred.reduce.tasks的值,加快(select distinct id from student) tmp部分的处理。
实例2:
优化前:
selecta,sum(b),count(distinct c),count(distinct d)
from test
group by a;
优化后:
select a,sum(b)as b,count(c) as c,count(d) as d
from (
select a,0 as b,c,null as d from test group by a,c
union all
select a,0 as b,null as c,d from test group by a,d
union all
select a,b,null as c,null as d from test
)tmp group bya;
尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积。
之前我有遇到过一种情况,不得不使用笛卡尔积,表关联的条件为不等式,还好两张表不大。如果你不得不使用笛卡尔积,那么一定要看一下其中的表是否符合Mapjoin的要求,如果符合,那么一定要使用Mapjoin。
join优化前:
select o.cid,c.id
from order o
join customer c
on o.cid = c.id
where o.dt = ‘2015-07-26‘;
join优化后:
select o.cid,c.id
from
(select cid
from order
where dt = ‘2015-07-26‘
)o
join customer c
on o.cid = c.id;
对一些过滤条件,能尽早过滤的就尽早过滤,减少IO资源浪费。
这个需要个人工作中注意就好了。
l 分区:
1) 静态分区
2) 动态分区
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
l 分桶
sethive.enforce.bucketing=true;
sethive.enforce.sorting=true;
l 数据
相同数据尽量聚集在一起,和分桶原理类似,尽量减少网络数据传输
原文:http://blog.csdn.net/jiangshouzhuang/article/details/51488099