首页 > 其他 > 详细

SortShuffleWriter的实现原理

时间:2020-12-27 02:13:25      阅读:39      评论:0      收藏:0      [点我收藏+]
这种ShuffleWriter会先把数据写入到一个buffer中,当buffer的内存到达一个上限值时,就会把该内存中的数据写入到磁盘的临时文件中,写入时会按分区Id进行排序,若可能还会根据key进行排序。最后把这些临时文件合并成一个文件,并为该文件创建一个索引文件。

实现步骤

SortShuffleWriter的实现过程如下图所示:

技术分享图片
对该过程的说明如下:

1)不断地把每个分区的数据填充到内存buffer中,有两种buffer:若需要通过key来进行聚合,则会使用PartitionedAppendOnlyMap结构;若不需要进行聚合操作,则使用PartitionedPairBuffer。在buffer中,会先按分区Id对记录进行排序,然后可能还会根据key进行排序。为了避免每个key多次使用分区器(partitioner),会把分区ID与每条记录一起存储,形如:((partitionId, K), V)。

2)当buffer达到内存上限时,就会把buffer的内容写入磁盘的一个临时文件中,然后释放内存资源。若我们需要进行聚合操作时,会先对记录按分区Id进行排序,然后还可能根据key或key的hashcode进行排序。对于每个文件,会跟踪记录每个分区有多少条数据,所以,我们不需要为每条数据写出分区Id。

3)然后,合并这些临时文件,包括目前在内存中的数据。若没有排序和聚合操作会按第2)步中的记录顺序进行合并;若需要按key进行聚合,则需要进行总体排序,或读取具有相同hashcode的可以,然后对其值进行排序。

外部排序器:ExternalSorter

该排序器对数据进行排序,并合并多个类型为(K,V)的键-值对来生成类型为(K,C)的键-组合器对。它使用分区器首先将键值分组到分区中,然后使用自定义比较器有选择地对每个分区中的键值进行排序。对于Shuffle操作获取数据时,可能会输出一个具有不同大小的单个分区文件。

实现说明

  • 外部排序器会反复填充数据的缓冲区,若想通过key来进行聚合操作,则使用:PartitionedAppendOnlyMap来缓存数据。若不对数据进行聚合,则使用:PartitionedPairBuffer来缓存数据。在缓存区中,ExternalSorter会按分区ID对元素进行排序,然后也可以按key进行排序。为避免每个key多次调用分区程序,将分区ID存储在每条记录处。

  • 当每个缓冲区达到ExternalSorter规定的内存限制时,会将其写出到文件中。如果想要进行聚合,则该文件首先按分区ID排序,并且可能按key或key的哈希值进行排序。对于每个文件,ExternalSorter会跟踪内存中每个分区中有多少对象,因此不必把每个元素写出分区ID。

  • 当用户请求迭代器或文件输出时,使用上面定义的相同排序顺序合并写出的文件以及任何剩余的内存数据(除非禁用排序和聚合)。如果我们需要按key聚合,要么使用排序参数的总排序,要么读取具有相同哈希值的键key,并将它们相互比较以获得合并值的一致性。

  • 用户期望在调用stop()函数时,删除所有中间的临时文件。

ExternalSorter初始化

在ExternalSorter对象进行初始化时,会获取一些环境配置,和辅助的类对象。具体的初始化过程如下:

  • 获取SparkEnv的配置数据,直接通过conf = SparkEnv.get.conf进行获取。

  • 获取分区数:numPartitions = partitioner.map(_.numPartitions).getOrElse(1)

  • 获取数据块管理器:blockManager = SparkEnv.get.blockManager

  • 获取磁盘数据块管理器:diskBlockManager = blockManager.diskBlockManager

  • 获取序列化管理器:serializerManager = SparkEnv.get.serializerManager

  • 实例化序列化管理器:serInstance = serializer.newInstance()

  • 获取配置参数的值:spark.shuffle.file.buffer

  • 说明,一般情况下,该参数表示:每个shuffle文件输出流的内存缓冲区大小(K)。默认值是:32K

  • 作用:通过这些缓冲区,减少了在创建中间shuffle文件时进行的磁盘I/O和系统调用的次数。

  • 获取配置参数:spark.shuffle.spill.batchSize的值:

  • 说明,该参数表示:每次从序列化器批量读/写对象的个数。该参数的默认值是:10000

  • 对象是批量写入的,每个批处理使用自己的序列化流。这样减少了反序列化流时构造的消耗。

  • 注意:在实际使用时,该值不能设置的太低。若将此设置得太低会导致序列化时过度复制。

  • 创建两个内存缓存这两个缓存区分别用于有聚合和无聚合的情况。
 // 有聚合操作时使用
 @volatile private var map = new PartitionedAppendOnlyMap[K, C]
 // 无聚合操作时使用
 @volatile private var buffer = new PartitionedPairBuffer[K, C]

ExternalSorter的insertAll()的实现

insertAll函数主要负责把数据放到shuffle writer的写入缓冲区中。上面已经分析过了,在shuffle writer中,buffer分为两种:

  • map端无聚合操作,使用:PartitionedPairBuffer[K,C]

  • map端有聚合操作,使用:PartitionedAppendOnlyMap[K,C]

并且,在使用这两种buffer时,会按分区ID作为key值。

该函数的详细实现步骤如下:

  • 在aggregator对象中获取设置初始值和更新值的回调函数

  • 遍历记录,若使用了聚合操作,则把记录数据添加到:PartitionedAppendOnlyMap中。若没有聚合操作,则把记录添加到:PartitionedPairBuffer缓冲区中。在写入缓冲区时,都以分区ID作为key。

  • 调用maybeSpillCollection()函数。该函数会调用maybeSpill函数,来判断缓冲区目前的使用情况,并根据使用情况把数据刷到磁盘,每次写入磁盘时都会生成一个临时文件,这样可能会形成多个临时文件,这些临时文件会被记录在定义的变量spillfile中,这些文件会在最后进行合并。

  • maybeSpill()函数的操作如下:

  • 判断读取的元素个数是否是32的倍数(内存对齐) 且 目前的内存使用量是否超过阈值,即参数:spark.shuffle.spill.initialMemoryThreshold"的值。默认是5M。

  • 若以上条件满足,则会申请:目前使用内存的2倍内存-阈值 这么大的内存。若申请失败,则直接把数据写入到磁盘,并释放内存空间。

  • 若目前的内存占有量大于可用内存的阈值,就应该把数据写入到磁盘上,此时会shouldSpill变量设置为true。

  • 接下来会调用spill()函数把数据写入到磁盘。

  • 最后会调用releaseMemory()来释放内存。

实际使用

 scala> val rdd = sc.parallelize(0 to 8).map(s => (s, 1)).reduceByKey((a,b)=>a+b)
 rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[7] at reduceByKey at <console>:25

 scala> rdd.dependencies
 res6: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@733f5bf4)

 scala> import org.apache.spark.ShuffleDependency
 import org.apache.spark.ShuffleDependency

 scala> val shuffleDep = rdd.dependencies(0).asInstanceOf[ShuffleDependency[Int, Int, Int]]
 shuffleDep: org.apache.spark.ShuffleDependency[Int,Int,Int] = org.apache.spark.ShuffleDependency@733f5bf4

 scala> shuffleDep.mapSideCombine
 res7: Boolean = true

 scala> shuffleDep.aggregator     // 查看是否有聚合器
 res8: Option[org.apache.spark.Aggregator[Int,Int,Int]] = Some(Aggregator(<function1>,<function2>,<function2>))

 scala> shuffleDep.partitioner.numPartitions  // 查看分区数
 res9: Int = 1

 scala> shuffleDep.shuffleHandle            // 查看ShuffleHandle的值
 res10: org.apache.spark.shuffle.ShuffleHandle = org.apache.spark.shuffle.BaseShuffleHandle@4fc12922

从以上代码可以看出,当使用reduceByKey时会产生shuffle操作,从以上操作结果可以看出,ShuffleHandle是BaseShuffleHandle,对应的shufflewriter是SortShuffleWriter。

小结

  • SortShuffleWriter会先把数据先写入到内存中,并会尝试扩展内存大小,若内存不足,则把数据持久化到磁盘上。

  • SortShuffleWriter在把数据写入磁盘时,会按分区ID进行合并,并对key进行排序,然后写入到该分区的临时文件中。

  • SortShuffleWriter最后会把前面写的分区临时文件进行合并,合并成一个文件,也就是说,会在map操作结束时把各个分区文件合并成一个文件。这样做可以有效的减少文件个数,和为了维护这些文件而产生的资源消耗。

SortShuffleWriter的实现原理

原文:https://blog.51cto.com/15067227/2573447

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!