避免使用shuffle的过程,从根本上消除数据倾斜的可能,使用Hive ETL预处理数据,按照Key进行聚合或者预先和其他表进行join,之后Spark作业中针对的元数据就不是原来的Hive表了,而是预处理过后的表,此时由于数据已经预聚合过,之后就不会在Spark中使用shuffle类算子操作。针对于必须做shuffle的操作可以增大key的粒度,减小数据倾斜的可能,增大每个task的数量。
说明:
这种方式虽然实现简单便捷,规避掉了Spark端数据倾斜的问题,大幅度的提高了Spark作业的性能,但是属于指标不治本行为,由于数据倾斜是数据本身分布不均产生的,所以在Hive ETL过程中进行groupBy或Join等shuffle操作时,还是会数据倾斜,我们只是将Spark中的数据倾斜提前到Hive ETL中而已。
实践经验:
我们公司的交互式用户行为分析系统中使用了这种方案,该系统主要是运行用户通过Java Web系统提交数据分析统计任务,后台通过Java提交Spark作业进行数据分析。要求Spak作业相应速度必须要快,否则会严重影响用户体验,所以我们将Spark中使用到Shuffle操作的数据提前到Hive ETL中,从而让Spark直接使用预处理的Hive表,尽可能减少Spark中的Shuffle操作,大幅度提升了性能。
这种情况只能针对单个Key或者少量的key导致的数据倾斜,此时可以将导致数据倾斜的key过滤出来单独处理,通过SparkSQL中的where子句或者在SparkCore中对RDD执行Filter算子过滤出这些Key。如果需要动态判定那些Key数据量最多然后再过滤,可以使用Sample算子对RDD进行采用,然后计算出每个Key的数量。
说明:这种实现方式简单,可以完全规避掉数据倾斜,但是大多数场景下导致数据倾斜的Key还是很多的,并不是只有少数几个。
实践经验:
某次发现某天的Spark作业运行的时候突然OOM了,追查后发下是Hive表中某个Key当天数据异常导致数据量暴增导致数据倾斜。然后我们每次执行任务前线采样,计算出样本中数据量最大的几个Key之后,如果这些key对计算结果不重要,直接过滤掉,如果该Key需要保留,我们将导致数据倾斜的Key过滤出来单独处理。
多个key数据量比较大的key由于分区设置不合理进入到了同一个分区导致的数据倾斜,增加reduce的并行度即可,极端情况下,由于某个Key的数据量比较大,增大并行度并没有作用,需要二次聚合。
说明:判断是单个Key导致的数据倾斜还是多个Key导致的数据倾斜,可以通过采样的方式实现。
就是数据本身不均匀,某个key的数据量很大,如果数据都在同一个mapper,可以使用combiner进行预聚合,如果数据分布在不同的mapper,此时可以使用二次聚合,或者自定义分区器将key打散。
二次聚合原理:
将原本相同的key通过附加随机前缀的方式,变成不同的key,这会让原本由一个Task处理的数据分散到多个task上去做局部聚合,进而解决单个task数据过多的问题,接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。
说明:二次聚合只能解决掉聚合类Shuffle操作导致的数据倾斜,如果是Join类的shuffle操作还得用其他方式解决。
正常情况下Join操作都会执行shuffle过程,并且执行的是reducejoin,先将所有相同的key和对应的value汇聚到一个reducetask中,然后再进行join,Join就会走shuffle,效率很低,可以用Broadcast变量与map类算子通过广播小RDD的全量数据实现同样的效果,且不会走Shuffle。
实现原理:
将较小的RDD中的数据直接通过collect算子拉取到Driver端的内存中,然后对其创建一个Broadcast变量广播给其他Executor,接着对另外一个RDD执行map类算子,再算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每条数据按照key进行对比,如果连接key相同,就将两个RDD的数据按照所需方式连接起来。
MapJoin原理:
在Map阶段将小表数据从 HDFS上读取到内存中的哈希表中,读完后将内存中的哈希表序列化为哈希表文件,在下一阶段当 MapReduce任务启动时,会将这个哈希表文件上传到Hadoop分布式缓存中,该缓存会将这些文件发送到每个Mapper的本地磁盘上。因此所有Mapper都可以将此持久化的哈希表文件加载回内存,并像之前一样进行 Join,顺序扫描大表完成Join,减少shuffle操作及reduce操作。
MapJoin两个阶段:
大表Join大表:
采用Sort Merge Bucket Join (SMB Join),将每张大表排序按照分区分桶,然后将对应的桶进行Join即可。
针对于两个两个数据量都是很大的RDD或者Hive表进行Join时,可以通过采用的方式看一下数据的分布情况,如果其中一个RDD或Hive表中的少数几个Keyd的数据量比较大,而另外一个RDD或Hive表中的所有Key都发呢不均匀,那么可以采用这种方式。
实现原理:
先将包含少数几个数据量较大的Key的RDD通过Sample算子采用一份样本出来,然后统计一下每个key的数量,并计算出来数据量最大的key。然后将这几个key对应的数据从RDD或hive表中拆分出来,形成单独的RDD,并给每个key都追加n以内的随机前缀,接着将不会导致数据倾斜的那个RDD也过滤出倾斜key对应的数据形成出一个单独RDD,将每条数据都膨胀成n跳数据,这n条数据都按顺序附加一个 0~N级前缀,再剑门关附加了岁间前缀的RDD与另一个膨胀n倍的RDD进行join,此时就可以将原来相同的key打散成n分分散到多个task中去join,而另外两个普通的RDD进行join即可,最后将两次Join的结果进行Union即可。
说明:这种方式知识和少数几个key导致的倾斜,只会对少数导致倾斜的key进行扩容n倍,不需要对全量数据进行扩容,避免占用过多内存,但是如果导致倾斜的key特别多,不适用。
如果join时,RDD中有大量的key导致数据倾斜,只能采用这种方式来解决数据倾斜问题。
实现原理:
先查看RDD或Hive表中数据的分布情况,找到造成数据倾斜的RDD或Hive表,然后将该RDD的每条数据都附加n以内的前缀,同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都附加0~N级前缀,最后将两个RDD进行Join即可。
说明:这种方式对Join类的数据倾斜基本都可以解决,但是该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜,而且由于不能对部分key拆分出来单独处理,需要对整个RDD进行扩容,对内存资源要求很高。
原文:https://www.cnblogs.com/yuexiuping/p/14832294.html