使用 foreachPartition 代替 foreach,在 foreachPartition 内获取数据库的连接
1)RDD
优点: 编译时类型安全 编译时就能检查出类型错误 面向对象的编程风格 直接通过类名点的方式来操作数据
缺点: 序列化和反序列化的性能开销 无论是集群间的通信, 还是 IO 操作都需要对对象的结构和数据进行序列化和反序列化。 GC 的性能开销,频繁的创建和销毁对象, 势必会增加 GC
2)DataFrame
DataFrame 引入了 schema 和 off-heap
schema : RDD 每一行的数据, 结构都是一样的,这个结构就存储在 schema 中。 Spark 通 过schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的 部分就可以省略了。
3)DataSet
DataSet 结合了 RDD 和 DataFrame 的优点,并带来的一个新的概念 Encoder。 当序列化数据时,Encoder 产生字节码与 off-heap 进行交互,能够达到按需访问数据的效 果,而不用反序列化整个对象。Spark 还没有提供自定义 Encoder 的 API,但是未来会加入。
三者之间的转换:
窗口函数就是在原来定义的 SparkStreaming 计算批次大小的基础上再次进行封装,每 次计算多个批次的数据,同时还需要传递一个滑动步长的参数,用来设置当次计算任务完成 之后下一次从什么地方开始计算。 图中 time1 就是 SparkStreaming 计算批次大小,虚线框以及实线大框就是窗口的大小, 必须为批次的整数倍。虚线框到大实线框的距离(相隔多少批次),就是滑动步长。
方法 1:(1)按照 key 对数据进行聚合(groupByKey) (2)将 value 转换为数组,利用 scala 的 sortBy 或者 sortWith 进行排序(mapValues) 数据量太大,会 OOM。
方法 2:(1)取出所有的 key (2)对 key 进行迭代,每次取出一个 key 利用 spark 的排序算子进行排序
方法 3:(1)自定义分区器,按照 key 进行分区,使不同的 key 进到不同的分区 (2)对每个分区运用 spark 的排序算子进行排序
有几百个文件,会有几百个 map 出现,读取之后进行 join 操 作,会非常的慢。这个时候我们可以进行 coalesce 操作,比如 240 个 map,我们合成 60 个 map,也就是窄依赖。这样再 shuffle,过程产生的文件数会大大减少。提高 join 的时间性 能
append 在原有分区上进行追加,overwrite 在原有分区上进行全量刷新
coalesce 和 repartition 都用于改变分区,coalesce 用于缩小分区且不会进行 shuffle,repartition 用于增大分区(提供并行度)会进行 shuffle,在 spark 中减少文件个数会使用 coalesce 来减少 分区来到这个目的。但是如果数据量过大,分区数过少会出现 OOM 所以 coalesce 缩小分区 个数也需合理
缓存:
(1)dataFrame.cache (2)sparkSession.catalog.cacheTable(“tableName”)
释放缓存:
(1)dataFrame.unpersist (2)sparkSession.catalog.uncacheTable(“tableName”)
参数 spark.sql.shuffle.partitions 决定 默认并行度 200
DataFrame.createTempView() 创建普通临时表
DataFrame.createGlobalTempView() DataFrame.createOrReplaceTempView() 创建全局临时 表
spark.reducer.maxSizeInFilght 此参数为 reduce task 能够拉取多少数据量的一个参数默认 48MB,当集群资源足够时,增大此参数可减少 reduce 拉取数据量的次数,从而达到优化 shuffle 的效果,一般调大为 96MB,资源够大可继续往上跳。
spark.shuffle.file.buffer 此参数为每个 shuffle 文件输出流的内存缓冲区大小,调大此参数可 以减少在创建 shuffle 文件时进行磁盘搜索和系统调用的次数,默认参数为 32k 一般调大为 64k
kafka 参数 auto.offset.reset 参数设置成 earliest 从最初始偏移量开始消费数据
通过 spark.streaming.kafka.maxRatePerPartition 参数来设置 Spark Streaming 从 kafka 分区每秒 拉取的条数
把 spark.streaming.backpressure.enabled 参数设置为 ture,开启背压机制后 Spark Streaming 会 根据延迟动态去 kafka 消费数据,上限由 spark.streaming.kafka.maxRatePerPartition 参数控制, 所以两个参数一般会一起使用
Spark Streaming stage 耗时由最慢的 task 决定,所以数据倾斜时某个 task 运行慢会导致整个 Spark Streaming 都运行非常慢。
把 spark.streaming.stopGracefullyOnShutdown 参数设置成 ture,Spark 会在 JVM 关闭时正常关 闭 StreamingContext,而不是立马关闭 Kill
命令:yarn application -kill 后面跟 applicationid
如何优雅的关闭 SparkStreaming 任务(将写好的代码打包,Spark-Submit) Kill -9 xxx ? 开启另外一个线程每 5 秒监听 HDFS 上一个文件是否存在。如果检测到存在,调用 ssc.stop()方法关闭 SparkStreaming 任务(当你要关闭任务时,可以创建你自定义监控的文件 目录)
Spark Streaming 默认分区个数与所对接的 kafka topic 分区个数一致,Spark Streaming 里一般 不会使用 repartition 算子增大分区,因为 repartition 会进行 shuffle 增加耗时
原文:https://www.cnblogs.com/eugene0/p/12846300.html