对于调优, 我觉得是没有放之四海而皆准的办法.
很多时候, 调优显得没有必要, 即使不进行调优, 程序也能够顺利执行.
在没有出现问题的时候, 不进行调优, 即使是在大数据中, 这也是我常常采用的原则.
并且, 针对问题再进行调优, 往往是更为合适的.
比如, 明明资源充足, 但程序运行依然很慢, 在这种情况下, 我们非得采取 kryo序列化的方式去增加一点点运行速度. 而没有增加并行度.
又或者有些时候, 减少不必要的 shuffle操作, 是更好地方式.
没有哪种调优手段是必须的, 且有效的.
这只是在帮助我们去了解, 问题可能会出在哪些地方, 在代码无可变更的情况下, 又该如何调整, 以使程序顺利运行. 生搬硬套, 万万不可取.
官方链接: Tuning Spark
中文链接: Spark 调优
由于大多数 Spark 计算的内存性质,Spark 程序可能由集群中的任何资源: 如 CPU,网络带宽, 内存 导致瓶颈, 通常情况下,如果数据有合适的内存,瓶颈就是网络带宽,但有时您还需要进行一些调整,例如 以序列化形式存储 RDD 来减少内存的使用。
本指南将涵盖两个主要的主题:数据序列化,这对于良好的网络性能至关重要,并且还可以减少内存使用和内存优化。我们选几个较小的主题进行展开。
序列化在任何分布式应用程序的性能中起着重要的作用。很慢的将对象序列化或消费大量字节的格式将会大大减慢计算速度。通常,这可能是您优化 Spark 应用程序的第一件事。Spark 宗旨在于方便和性能之间取得一个平衡(允许您使用操作中的任何 Java 类型)。它提供了两种序列化库:
你可以切换到Kryo, 通过 sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 的方式. 通过这个序列化配置, 不仅仅能够使 Kryo序列化使用在 shuffling操作中, 也可以在 序列化 RDDs 到硬盘时使用. 之所以在 Spark中没有将 Kryo当做默认实现方式的原因是, 需要自身手动注册对应的class.
但是建议你在 任何的 网络密集型 application 中使用它. 从Spark 2.0.0以来, Spark在使用简单类型, 如 基本类型的数组, 或是String类型, 在这些数据类型做 shuffling操作时, 使用的内部方式都是 Kryo.
Spark 自动包含 Kryo 序列化器,用于 Twitter chill 中 AllScalaRegistrar 涵盖的许多常用的核心 Scala 类, 这意味着, 对于大多数常见类来说, 你并不需要通过如下方式注册类:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
如果想要知道更多的 kryo细节, 可以参考kryo的官方, 官方提供了更加先进的注册方式, 如自定义 序列化 代码.
还有一点需要注意到的地方是:
如果你的对象太大, 你需要增大 spark.kryoserializer.buffer 配置.
但是在看过配置说明之后, 发现实际上需要更改的配置是 spark.kryoserializer.buffer.max 暂时还没有经过验证.
对于 spark.kryoserializer.buffer 配置而言, 默认值是 64k, 这个是 kryo buffer 的初始化值, 对于每个worker中的每一个核都只有一个 buffer. buffer 在需要的情况下最终会增长到 spark.kryoserializer.buffer.max.
而 spark.kryoserializer.buffer.max 则是 kryo buffer 的最大值, 默认是 64m, 必须大于任何一个你需要序列化的对象 同时 要需要 2048m, 当遇到 buffer limit exceeded exception时 说明你需要增加这个值了.
最后需要注意的是, 即使你并不注册 任何一个 classes, kryo依然可以工作, 但是 此时不得不 存储 类的全名称, 这是很大的浪费.
内存调优有三个方面需要考虑:
你的对象所使用的内存总量(你可能想要整个数据集都存储在内存中.)
访问对象的成本
垃圾收集的开销
一般来说, 访问对象是很快速的, 但是很容易消耗比其字段中的 “raw” 数据多 2-5 倍的空间, 这是由于以下几个原因:
每个不同的 Java 对象都有一个 “object header”,它大约是 16 个字节,包含一个指向它的类的指针。对于一个数据很少的对象(比如说一个Int字段),这可以比数据大。
Java String 在原始字符串数据上具有大约 40 字节的开销(因为它们存储在 Char 数组中并保留额外的数据,例如长度),并且由于 UTF-16 的内部使用而将每个字符存储为 两个 字节 String 编码。因此,一个 10 个字符的字符串可以容易地消耗 60 个字节。
公共收集类,例如 HashMap 和 LinkedList,使用链接的数据结构,其中每个条目(例如: Map.Entry)存在 “包装器” 对象。该对象不仅具有 header,还包括指针(通常为 8 个字节)到列表中的下一个对象.
原始类型的集合通常将它们存储为 “盒装” 对象,例如: java.lang.Integer.
本节将从 Spark 的内存管理概述开始,然后讨论用户可以采取的具体策略,以便在 application 中更有效地使用内存。具体来说,我们将描述如何确定对象的内存使用情况,以及如何改进数据结构,或通过以序列化的格式存储数据。然后我们将介绍调整 Spark 的缓存大小和 Java 垃圾回收器。
Spark中的内存使用主要可以分为两大类, 执行 和 存储.
执行内存 主要指 用于计算中的 混洗(shuffle), 合并(join), 聚合(aggregation).
存储内存主要指 在集群中 缓存 和 传播内部数据 使用的内存.
在Spark中, 执行 和 存储 共享统一区域(M). 当没有 执行 内存在使用时, 存储即可获取所有的可用内存 反之亦然.
如果需要, 执行可以 驱逐 存储, 但这必须在 总的存储 内存的使用量 低于 阈值(R). 换句话说, R 描述了M 缓存块永远不会被驱逐 的 区域. 由于实现的复杂性, 存储永远 不会驱逐 执行.
该设计确保了几个理想的性能.
首先,不使用缓存的应用程序可以将整个空间用于执行,从而避免不必要的磁盘泄漏。
第二,使用缓存的应用程序可以保留最小的存储空间(R),其中数据块不受驱逐。
最后,这种方法为各种工作负载提供了合理的开箱即用性能,而不需要用户内部如何分配内存的专业知识。
虽然提供了两条相关配置, 但是典型 用户 并不需要调整他们, 默认值已经能够满足大多数情况下的使用要求了.
spark.memory.fraction 描述中提到的 内存 M, 是指占用的 (JVM堆内存 - 300M) 的比率, 默认是0.6. 剩余的0.4保留用于用户数据结构,Spark中的内部元数据,并且在偶尔遇到异常大的记录的情况下保护OOM错误.
spark.memory.storageFraction 描述中所提到的R, 是指占用 M 的比率, 默认是0.5. R 是 M 缓存块中的缓存被执行驱逐的存储空间.
第二个参数 也就已经描述了, 存储 和 执行 各自需要占用的比例, 也即, 对于 shuffle join agg 并不存在的 程序中, 完全可以将比例调低, 将内存供给 执行器使用. 默认是55开.
在 Jvm 的旧版本 或 长期支持的版本中, 应该指定 spark.memory.fraction 以适应 堆内存的大小.
确定数据集需要占用内存的大小的最佳方式是, 创建RDD, 放入缓存中, 然后查看 Web UI 的 Storage 页面.
为了估算特定对象的内存占用, 应该使用 SizeEstimator 的 estimate 方法, 这对于尝试使用不同的数据布局以减少内存使用量 以及 确定广播变量将在每个执行程序堆上占用的空间量很有用.
减少内存消耗的第一种方法是避免添加开销的 Java 功能,例如基于指针的数据结构和包装对象。有几种方法可以做到这一点:
将数据结构设计为偏好对象数组和原始类型,而不是标准的 Java 或 Scala 集合类(例如: HashMap)。该 fastutil 库提供方便的集合类基本类型是与 Java 标准库兼容。
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<version>8.3.0</version>
</dependency>
fastUtil 官方链接
尽可能避免使用很多小对象和指针的嵌套结构. 比如Integer.
考虑使用数字 ID 或枚举对象而不是 字符串形式的 键.
如果您的 RAM 小于32 GB,请设置 JVM 标志 -XX:+UseCompressedOops,使指针为4个字节而不是8个字节. 你可以在 spark-env.sh中添加这个选项.
尽管已经做了些许调整, 但是的对象仍然太大而无法有效存储,减少内存使用的一个更简单的方法是以序列化形式存储它们,使用 RDD 持久性 API 中的序列化 StorageLevel,例如: MEMORY_ONLY_SER, Spark 将会将每个 RDD 分区存储为一个大字节数组。以序列化形式存储数据的唯一缺点是访问变得更慢,因为必须对每个对象进行反序列化。如果你想以序列化形式缓存数据,强烈建议使用 Kryo,因为它导致比 Java 序列化更小的尺寸(而且肯定比原 Java 对象)更小。
当您的程序存储的 RDD 有很大的”流失”时,JVM 垃圾收集可能是一个问题。(程序中通常没有问题,只读一次 RDD,然后在其上运行许多操作)。当 Java 需要驱逐旧对象为新的对象腾出空间时,需要跟踪所有 Java 对象并找到未使用的。要记住的要点是,垃圾收集的成本与 Java 对象的数量成正比,因此使用较少对象的数据结构(例如: Ints 数组,而不是 LinkedList)大大降低了此成本。一个更好的方法是如上所述以序列化形式持久化对象:现在每个 RDD 分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果 GC 是一个问题,首先要使用序列化缓存。
由于任务的工作内存(运行任务所需的空间)和缓存在节点上的 RDD 之间的干扰,GC 也可能是一个问题。我们将讨论如何控制分配给RDD缓存的空间来减轻这一点。
GC 调整的第一步是收集关于垃圾收集发生频率和GC花费的时间的统计信息。这可以通过添加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 到 Java 选项来完成.
至于如何添加到 Java选项, 可以参考: Spark集群-Standalone 模式
下次运行 Spark 作业时,每当发生垃圾回收时,都会看到在工作日志中打印的消息。请注意,这些日志将在你的集群的工作节点上(stdout 在其工作目录中的文件中),而不是你的 driver节点。
GC并非只是在 Spark项目中独有的, 而是放之Java而皆准的道理.
只要网上搜下, Java GC调优, 相信你会找到许许多多的资料, 我就不在这里详细描述了.
因为通过之前的描述, 已经懂得了如何估算使用内存, 查看特定对象的内存, 以及打印GC日志.
最后, 可以通过配置:
spark.executor.extraJavaOptions 来指定执行器的 GC 调整参数.
相信只要稍微了解过Spark调优, 就会看到这个观点.
除非您为每个操作设置足够高的并行度,否则群集将无法充分利用. Spark根据文件的大小自动设置要在每个文件上运行的 “映射” 任务的数量(尽管您可以通过可选的参数来控制它SparkContext.textFile,等等),并且对于分布式的 reduce 操作(例如groupByKey和reduceByKey),它使用最大的父RDD的分区数。您可以将并行性级别作为第二个参数传递,或将config属性设置spark.default.parallelism为更改默认值。通常,我们建议集群中每个CPU内核执行2-3个任务.
有时并非因为RDD没有足够的内存导致内存溢出, 而是因为 而是因为 某一个 任务的 工作集, 如在 groupByKey 的 reduce任务 太大了, Spark的 shuffle操作 (sortByKey, groupByKey, reduceByKey, join 等操作) 建立每个任务中的哈希表来进行分组,而这往往是很大的. 这里最简单的解决方案是 增加并行级别, 以便每个任务的输入集都更小。Spark 可以有效地支持短达 200ms 的任务,因为它可以将多个任务中的一个执行者 JVM 重用,并且任务启动成本低,因此您可以将并行级别安全地提高到比集群中的核心数量更多。
使用 可用的广播功能 SparkContext 可以大大减少每个序列化任务的大小,以及在群集上启动作业的成本。如果您的任务使用其中的驱动程序中的任何大对象(例如:静态查找表),请考虑将其变为广播变量。Spark 打印主机上每个任务的序列化大小,因此您可以查看该任务以决定您的任务是否过大; 一般任务大于 20KB 大概值得优化。
数据本地化可能会对 Spark job 的性能产生重大影响。如果数据和在其上操作的代码在一起,则计算往往是快速的。但如果代码和数据分开,则必须移动到另一个。通常,代码大小远小于数据,因此将数据代码从一个地方寄送到另一个地方比一大块数据更快。Spark 围绕数据局部性的一般原则构建其调度。
数据本地化是指数据和代码处理有多近。根据数据的当前位置有几个地方级别。从最近到最远的顺序:
PROCESS_LOCAL 数据与运行代码在同一个 JVM 中。这是可能的最好的地方
NODE_LOCAL 数据在同一个节点上。示例可能在同一节点上的 HDFS 或同一节点上的另一个执行程序中。这比 PROCESS_LOCAL 因为数据必须在进程之间移动慢一些
NO_PREF 数据从任何地方同样快速访问,并且没有本地偏好
RACK_LOCAL 数据位于同一机架上的服务器上。数据位于同一机架上的不同服务器上,因此需要通过网络发送,通常通过单个交换机发送
ANY 数据在网络上的其他地方,而不在同一个机架中
Spark 喜欢将所有 task 安排在最佳的本地级别,但并不能做到永远如愿以偿。在任何空闲 executor 中没有未处理数据的情况下,Spark 将切换到较低的本地级别。
有两个选项:
等待繁忙的CPU释放以在同一服务器上的数据上启动任务
立即将数据移动到更远的地方启动新任务
Spark通常要做的是稍等一下,以期释放繁忙的CPU。
一旦超时到期,它将开始将数据从很远的地方移到空闲的CPU中。每个级别之间的回退等待超时可以单独配置,也可以一起配置在一个参数中。
有关详细信息,请参见配置页面spark.locality上的 参数。如果您的任务很长并且位置不佳,则应该增加这些设置,但是默认设置通常效果很好。
参数有:
spark.locality.wait 默认值3秒, 级别会逐渐从 PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL 逐渐过渡. 采取的时间都是相同的.
可以分别指定三种类型对应的数据. spark.locality.wait.node, spark.locality.wait.process, spark.locality.wait.rack.
原文:https://www.cnblogs.com/zyzdisciple/p/11621864.html