首页 > 其他 > 详细

Spark 数据倾斜与解决方案

时间:2021-05-31 21:51:32      阅读:20      评论:0      收藏:0      [点我收藏+]

Spark 数据倾斜

数据倾斜的表现:

  • 大部分的task都迅速完成,只有少数几个task运行非常慢,或者在运行过程中报OOM,反复执行几次都是OOM。

数据过量:

  • 由于数据量较大而导致的整体任务运行缓慢,这种情况只需要增加资源即可。

造成数据倾斜的原因:

  • 使用了具有shuffle的算子、数据本身就存在倾斜问题。

解决方案:

1.聚合原数据:

  • 避免使用shuffle的过程,从根本上消除数据倾斜的可能,使用Hive ETL预处理数据,按照Key进行聚合或者预先和其他表进行join,之后Spark作业中针对的元数据就不是原来的Hive表了,而是预处理过后的表,此时由于数据已经预聚合过,之后就不会在Spark中使用shuffle类算子操作。针对于必须做shuffle的操作可以增大key的粒度,减小数据倾斜的可能,增大每个task的数量。

    说明:

    这种方式虽然实现简单便捷,规避掉了Spark端数据倾斜的问题,大幅度的提高了Spark作业的性能,但是属于指标不治本行为,由于数据倾斜是数据本身分布不均产生的,所以在Hive ETL过程中进行groupBy或Join等shuffle操作时,还是会数据倾斜,我们只是将Spark中的数据倾斜提前到Hive ETL中而已。

  • 实践经验:
    我们公司的交互式用户行为分析系统中使用了这种方案,该系统主要是运行用户通过Java Web系统提交数据分析统计任务,后台通过Java提交Spark作业进行数据分析。要求Spak作业相应速度必须要快,否则会严重影响用户体验,所以我们将Spark中使用到Shuffle操作的数据提前到Hive ETL中,从而让Spark直接使用预处理的Hive表,尽可能减少Spark中的Shuffle操作,大幅度提升了性能。

2过滤导致数据倾斜的Key:

  • 这种情况只能针对单个Key或者少量的key导致的数据倾斜,此时可以将导致数据倾斜的key过滤出来单独处理,通过SparkSQL中的where子句或者在SparkCore中对RDD执行Filter算子过滤出这些Key。如果需要动态判定那些Key数据量最多然后再过滤,可以使用Sample算子对RDD进行采用,然后计算出每个Key的数量。

    说明:这种实现方式简单,可以完全规避掉数据倾斜,但是大多数场景下导致数据倾斜的Key还是很多的,并不是只有少数几个。

  • 实践经验:
    某次发现某天的Spark作业运行的时候突然OOM了,追查后发下是Hive表中某个Key当天数据异常导致数据量暴增导致数据倾斜。然后我们每次执行任务前线采样,计算出样本中数据量最大的几个Key之后,如果这些key对计算结果不重要,直接过滤掉,如果该Key需要保留,我们将导致数据倾斜的Key过滤出来单独处理。

3.提高shuffle操作中的reduce并行度:

  • 多个key数据量比较大的key由于分区设置不合理进入到了同一个分区导致的数据倾斜,增加reduce的并行度即可,极端情况下,由于某个Key的数据量比较大,增大并行度并没有作用,需要二次聚合。

    说明:判断是单个Key导致的数据倾斜还是多个Key导致的数据倾斜,可以通过采样的方式实现。

4.使用随机key实现双重聚合(二次聚合):

  • 就是数据本身不均匀,某个key的数据量很大,如果数据都在同一个mapper,可以使用combiner进行预聚合,如果数据分布在不同的mapper,此时可以使用二次聚合,或者自定义分区器将key打散。

  • 二次聚合原理:
    将原本相同的key通过附加随机前缀的方式,变成不同的key,这会让原本由一个Task处理的数据分散到多个task上去做局部聚合,进而解决单个task数据过多的问题,接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。

    说明:二次聚合只能解决掉聚合类Shuffle操作导致的数据倾斜,如果是Join类的shuffle操作还得用其他方式解决。

5.大表Join小表时使用MapJoin。

  • 正常情况下Join操作都会执行shuffle过程,并且执行的是reducejoin,先将所有相同的key和对应的value汇聚到一个reducetask中,然后再进行join,Join就会走shuffle,效率很低,可以用Broadcast变量与map类算子通过广播小RDD的全量数据实现同样的效果,且不会走Shuffle。

  • 实现原理:
    将较小的RDD中的数据直接通过collect算子拉取到Driver端的内存中,然后对其创建一个Broadcast变量广播给其他Executor,接着对另外一个RDD执行map类算子,再算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每条数据按照key进行对比,如果连接key相同,就将两个RDD的数据按照所需方式连接起来。

  • MapJoin原理:
    在Map阶段将小表数据从 HDFS上读取到内存中的哈希表中,读完后将内存中的哈希表序列化为哈希表文件,在下一阶段当 MapReduce任务启动时,会将这个哈希表文件上传到Hadoop分布式缓存中,该缓存会将这些文件发送到每个Mapper的本地磁盘上。因此所有Mapper都可以将此持久化的哈希表文件加载回内存,并像之前一样进行 Join,顺序扫描大表完成Join,减少shuffle操作及reduce操作。

  • MapJoin两个阶段:

    1. 通过MapReduce Local Task将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里对HashTableFiles进行压缩。
    2. MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
  • 大表Join大表:

    采用Sort Merge Bucket Join (SMB Join),将每张大表排序按照分区分桶,然后将对应的桶进行Join即可。

6.Sample采样对倾斜key单独进行join:

  • 针对于两个两个数据量都是很大的RDD或者Hive表进行Join时,可以通过采用的方式看一下数据的分布情况,如果其中一个RDD或Hive表中的少数几个Keyd的数据量比较大,而另外一个RDD或Hive表中的所有Key都发呢不均匀,那么可以采用这种方式。

  • 实现原理:
    先将包含少数几个数据量较大的Key的RDD通过Sample算子采用一份样本出来,然后统计一下每个key的数量,并计算出来数据量最大的key。然后将这几个key对应的数据从RDD或hive表中拆分出来,形成单独的RDD,并给每个key都追加n以内的随机前缀,接着将不会导致数据倾斜的那个RDD也过滤出倾斜key对应的数据形成出一个单独RDD,将每条数据都膨胀成n跳数据,这n条数据都按顺序附加一个 0~N级前缀,再剑门关附加了岁间前缀的RDD与另一个膨胀n倍的RDD进行join,此时就可以将原来相同的key打散成n分分散到多个task中去join,而另外两个普通的RDD进行join即可,最后将两次Join的结果进行Union即可。

    说明:这种方式知识和少数几个key导致的倾斜,只会对少数导致倾斜的key进行扩容n倍,不需要对全量数据进行扩容,避免占用过多内存,但是如果导致倾斜的key特别多,不适用。

7.使用随机前缀和扩容RDD进行Join:

  • 如果join时,RDD中有大量的key导致数据倾斜,只能采用这种方式来解决数据倾斜问题。

  • 实现原理:
    先查看RDD或Hive表中数据的分布情况,找到造成数据倾斜的RDD或Hive表,然后将该RDD的每条数据都附加n以内的前缀,同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都附加0~N级前缀,最后将两个RDD进行Join即可。

    说明:这种方式对Join类的数据倾斜基本都可以解决,但是该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜,而且由于不能对部分key拆分出来单独处理,需要对整个RDD进行扩容,对内存资源要求很高。

Spark 数据倾斜与解决方案

原文:https://www.cnblogs.com/yuexiuping/p/14832294.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!