1. textFile的minparitition,只是设置最小的partition数目,下界(比如3),当数据量大的时候,改参数不起作用。可以尝试设置成1000
2. reduceByKey的并行度,也就是reduce的数目。
2.1 shuffle的时候,生成的中间文件和heap大小跟reduce数目成正比。reduce数目太大,每个reduce 任务要和driver通信,容易导致driver OOM,和shuffle中间文件过多。reduce数目太小,每个executor(执行reduce task)需要装载大量数据,导致executor OOM。
3. 优化union。多个RDDunion时,如果每个RDD 的partition数目一致,union操作将被优化掉了。
4. 设置executor-memory和executor-cores,num-executors控制并发度,对运行速度影响很大
4.1 每个叶子节点(物理机)上可以执行多个yarn executor,每个executor可以设置executor-cores个线程,共享executor-memory内存。
4.2 线程数目(executor-cores)不宜超过3,否则容易导致executor OOM
4.3 executor-memory 越小,并发度越高,执行越快,但每个线程的memory越小,容易导致executor OOM
5. shuffle是重要的一部分,容易产生中间文件过多和OOM。参考文献http://www.cs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf
5.1 设置snnapy压缩取代lzf,减少map阶段内存使用量(400k/file -> 32k/file)。 spark.io.compression.codec org.apache.spark.io.SnnapyCompressionCodec
5.2 调低cache。conf spark.storage.memoryFraction=0.3,腾出更多的内存。
5.3 减少中间文件数量:spark.shuffle.consolidateFiles true,每个executor生成的文件数目是:C*R,而不是M*R。
5.4 减少map阶段生成中间文件消耗的buf。spark.shuffle.file.buffer.kb设成32 (kb)
5.5 减少Hash-based shuffle manager占用的内存,设置sort spark.shuffle.manager SORT
6. 找出性能瓶颈,内存,磁盘,cpu,和网络
6.1 内存瓶颈看 已使用的memory,free memory,executor memory,GC,full GC
6.2 磁盘瓶颈看 磁盘读写,磁盘等待
6.3 CPU瓶颈看 CPU曲线,cpu是否等待IO
6.4 网络瓶颈看 进出包量曲线
7. 避免partition数目过大导致reduce数目过大,缩减partition数目:coalesce(2500, false)
8. 分段调优