前言:数据倾斜通常在shuffle时发生,因此解决数据倾斜的思路有三个:
1,把小表广播避免shuffle;
2,把大key加随机前缀打散;
3,把大key过滤出来单独处理;
spark.default.parallelism:RDD的默认并行度(分区数);如果没有使用coalesce、reparation,或者shuffle类算子没有指定分区数时,就会使用默认的并行度;并行度的设置非常重要,决定了是否能够充分利用分配的物理资源,也就是说,给了你充足的executor、core和内存,而你程序却没有用上;官网给出的并行度建议是executor*core的23倍;经过测试,每个task处理200m300m的数据量时,性能最优;sparksql读取文件时的默认并行度;
spark.sql.shuffle.partitions:DataFrame的shuffle默认并行度,与上面的RDD并行度类似;千万要注意,设置了RDD并行度只对RDD有效,要设置DataFrame并行度才对Dataframe有效,所以最好这两个参数都设置;
默认分区数:均为yarn模式下;a.union(b)的最终分区是a+b,a与b笛卡尔积是a*b;
// 该参数可以控制rdd一个切片的大小,如果想增大一个切片的大小,可以调大该参数;
spark.hadoop.mapreduce.input.fileinputformat.split.minsize
// 控制sparksql一个切片的大小
spark.sql.files.maxPartitionBytes
// 生成spark.context时,会生成下面两个参数,由下面两个参数可以推算出rdd的分区数;
sc.defaultParallelism = spark.default.parallelism(总核数)
sc.defaultMinPartitions = min(spark.default.parallelism,2)
RDD:
scala集合生成(parallelize):如果没有指定分区数,则默认是defaultParallelism;
读文件时(textFile):max(文件的分片数,defaultMinPartitions),hdfs是128M一片,本地文件是32M;
如果textFile设置了分区参数m,则最后的分区数为max(文件的分片数, m);
shuffle时:等于defaultParallelism,没设置就选两个rdd分区最大值;
SparkSQL:
coalesce(10):不触发shuffle的重分区,以分区为最小单位进行重分区,一般在filter之后最好加上coalesce;
repartition(10):触发shuffle的coalesce重分区,以数据为最小单位进行重分区,在分组和union之后最好调用;
partitionBy(new HashPartitioner(10)):按照自己的分区规则分区,这里是按照Hash值分区;
背景:有一个全量商品表(小表,约20万),和一个用户日志表(大表,约20亿),现在要 join 这两个表;而大表中因为一些热销品,导致热销商品的日志记录非常多,大部分商品的记录在4万 ~ 6万,而热销商品的记录将近100万,所以就导致了数据倾斜;如果小表非常小(几百兆到1g)则可以将小表广播,然后在map端join;大于1g则可以使用下面的方法;
解决思路:首先将大表倾斜的key过滤出来,加上[0, 20)的随机前缀;然后把小表中相应的key膨胀20倍,对每一个key加上[0,20)的前缀,然后 join;对于非倾斜的key不做加工,然后 union 倾斜key的结果和非倾斜key的结果;
实现步骤:
过滤倾斜key:对大表中的key使用sample(false, 0.1)做10%的不放回抽样,然后统计每个key出现的次数,取top20,或者次数超过一定阈值的key作为倾斜key集合slant,然后将slant广播;
把大表中的倾斜key加[0, n)的随机前缀:首先使用filter(x=>slant.contains(x))过滤出倾斜的key,然后使用map对大表倾斜的key加上随机前缀,例如:map(x=>(Random.nextInt(20)+"_"+x._1, x._2)),这里是加上[0, 20)的随机前缀,也就是将每一个大key拆分为20个新的key;
把小表中的倾斜key膨胀n倍:同样,使用filter过滤出倾斜key,然后使用 flatMap(x=>
val arr = new mutable.ArrayBuffer[(key, value)]
for(i <- 0 until 20){arr.append(i+"_"+x._1, x._2) }
arr) 将小表中的key膨胀20倍;
分别 join 再 union:把大表和小表中的倾斜key join,非倾斜key与非倾斜key join,然后再 union 两者的结果,就是最终的结果了;
原文:https://www.cnblogs.com/shendeng23/p/15240771.html