这个参数设置了Spark运行时需要多少个Executor进程来执行,当程序放在Yarn上运行时,会启动相应数量的Executor,这个参数如果不设置,默认的情况下Executor的数量比较少,可以适当的进行调高,但不要过高。平常接触的业务中最大设置过80,我那时候已经觉得自己很不要脸了~
该参数设置了每个Executor的内存,这个参数的设置会直接影响到许多程序的运行,常见的OOM异常会跟该参数有直接的关系,这个的设置就要取决于自己的内存的大小,量力而行,可以用上面的参数*该参数,这个乘积是不能超过最大内存的,个人建议设置成最大内存的一半左右,如果还有其他人与你一起共用该内存,设置过大会影响他人的性能。
这个参数表示dirver的内存大小,默认是1,可以根据情况进行适当的调高,个人建议设置到10-20左右,避免发生OOM。
这个是Spark中核心配置之一,设置每个Executor的核心CPU的数量,一个Executor的CPU同时只能运行一个任务,这个平常设置都是1-4个,最大的时候是4。
这个是driver程序的CPU内核数量,默认为1,本人一直都在使用默认,没有调过,大家可以尝试下适当调高,之前看别人的文章可以调到8-16。
这个参数用来设置每个stage的task数量,在官方中给出的建议是设置在num-executor和executor-cores的乘积的2-3倍左右,我一般设置的数量大概是200-500之间,可以根据num-executor和executor-cores进行适当的调整。
Tips:上面说了一些常用的参数,如果作出适当的调整后还没有满足需要,就应该考虑修改自己的业务逻辑了!!!
? 以上是我在平常使用到的一些参数,其实还有很多,本人也在不断的学习!!
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
SparkConf conf = new SparkConf().setAppName("test")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{ConnectionDataVO.class});
Tips:在官网解释说:Java的序列化更加灵活,但是执行速度缓慢,Kryo的速度更快,但是支持的类型较少。
Tips:以上是通过修改一些程序中的设置,以及对数据源进行优化
在程序中添加可能会出现原因的log信息并进行打印。
SparkUI界面上可以查看DAG的运行逻辑和具体某个算子的运行过程。
分析原有的Key的分布情况,判断Key的粒度是否过大,或者过小。比如,按天统计的可以扩大成按周统计的,按月统计的可以细分到按周统计的,这种方式大多针对含有时间区分的Key中,调整后的Key的数量和所对应的数据量肯定不相同,避免了数据倾斜。
在业务中可能经过一次groupByKey或者reduceByKey等聚合操作时产生的Key的数量过多,可以多次进行聚合,比如,在第一次聚合时,先在Key的前面加上HashCode或者随机数,使其本来聚合成一个Key的,在聚合前先小范围的聚合一次,之后再聚合时,将前面的随机数等去掉,再聚合。
可以将该Key单独提出来生成一个RDD,在Saprk中存在一种机制,当某个RDD中只存在一个Key时,在进行Shuffle操作时,会将该Key的所有value进行打散,分配到不同的task中进行处理,处理后,再将数据进行join,这样可以解决数据倾斜的问题。
如果程序中存在join算子,找出相对来说数据量较小的那个RDD,通过Spark中的广播功能,调用broadcast的方法传递这个RDD得到一个Broadcast类型的变量,调用.value,将这个小份数据广播到每个Executor中,之后不再调用join操作,使用Map,在改算子中根据某些相关联的条件将两个数据进行连接,这个方式对于join操作产生的数据倾斜,效果比较好,因为不会产生Shuffle操作。
当程序遇到许多个Key都发生数据倾斜时,可以对剩下正常的Key进行适当的扩容,以分担数据倾斜Key的数据,这样的操作也只是缓解当前的数据倾斜,并没有从根本上解决问题,使用该方法对内存的还会有一定的要求,如果使用不恰当可能会出现OOM。
原文:https://www.cnblogs.com/miker-lcy/p/13678273.html