一:为什么需要Sort-Based Shuffle?
1,? Shuffle一般包含两个阶段任务:
2,? Spark的Job会被划分成很多Stage:
1,? Spark Shuffle在最开始的时候只支持Hash-based Shuffle:默认Mapper阶段会为Reducer阶段的每一个Task单独创建一个文件来保存该Task中要使用的数据。
2,Hash-based Shuffle产生的问题:
Spark在引入Sort-Based Shuffle之前,适合中小型数据规模的大数据处理!
1,为了让Spark在更大规模的集群上更高性能处理更大规模的数据,于是就引入了Sort-based Shuffle!从此以后(Spark1.1版本开始),Spark可以胜任任何规模(包括PB级别及PB以上的级别)的大数据的处理,尤其是钨丝计划的引入和优化,Spark更快速的在更大规模的集群处理更海量的数据的能力推向了一个新的巅峰!
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
?3,Spark1.6默认采用的就是Sort-based Shuffle的方式:
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
????? spark.shuffle.manager SORT?? 配置Shuffle方式是SORT
3,? Sort-based Shuffle的工作方式如下:Shuffle的目的就是:数据分类,然后数据聚集
1)?????? 首先每个ShuffleMapTask不会为每个Reducer单独生成一个文件,相反,Sort-based Shuffle会把Mapper中每个ShuffleMapTask所有的输出数据Data只写到一个文件中。因为每个ShuffleMapTask中的数据会被分类,所以Sort-based Shuffle使用了index文件存储具体ShuffleMapTask输出数据在同一个Data文件中是如何分类的信息!!
2)?????? 基于Sort-base的Shuffle会在Mapper中的每一个ShuffleMapTask中产生两个文件:Data文件和Index文件,其中Data文件是存储当前Task的Shuffle输出的。而index文件中则存储了Data文件中的数据通过Partitioner的分类信息,此时下一个阶段的Stage中的Task就是根据这个Index文件获取自己所要抓取的上一个Stage中的ShuffleMapTask产生的数据的,Reducer就是根据index文件来获取属于自己的数据。
涉及问题:Sorted-based Shuffle:会产生 2*M(M代表了Mapper阶段中并行的Partition的总数量,其实就是ShuffleMapTask的总数量)个Shuffle临时文件。
???????? ???????? Basic Hash Shuffle: M*R;
???????? ???????? Consalidate方式的Hash Shuffle: C*R;
???????? ???????? Sort-based Shuffle: 2*M;
四:在集群中动手实战Sort-based Shuffle
在Sorted-based Shuffle中Reducer是如何获取自己需要的数据呢?具体而言,Reducer首先找Driver去获取父Stage中的ShuffleMapTask输出的位置信息,根据位置信息获取index文件,解析index,从解析的index文件中获取Data文件中属于自己的那部分内容;
Sorted-based Shuffle与排序没有关系,Sorted-based Shuffle并没有对内容进行排序,Sorted-based Shuffle是对Shuffle进行Sort,对我们具体要执行的内容没有排序。
当parent Stage的所有ShuffleMapTasks结束后再fetch。等所有的ShuffleMapTask执行完之后,边fetch边计算。
通过动手实践确实证明了Sort-based Shuffle产生了2M个文件。M是并行Task的数量。
1.?????? ShuffleMapTask的runTask方法
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
??? ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
? try {
val manager = SparkEnv.get.shuffleManager
??? writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
??? writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
??? writer.stop(success = true).get
2.?????? SortShuffleManager复写了ShuffleManager中的getWriter方法,源码如下:
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
??? handle: ShuffleHandle,
mapId: Int,??????? //也就是partitionId
context: TaskContext): ShuffleWriter[K, V] = {
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
? }
3.?????? SorShuffleWriter的write方法源码如下:
/** Write a bunch of records to this task‘s output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
????? context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
? } else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don‘t
??? // care whether the keys get sorted in each partition; that will be done on the reduce side
??? // if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
????? context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
? }
// Don‘t bother including the time to open the merged output file in the shuffle write time,
? // because it just opens a single file, so is typically too fast to measure accurately
? // (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
? shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
// Format of the shuffle block ids (including data and index) should be kept in sync with
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
4.?????? 其中writeIndexFileAndCommit方法:
/** ?* Write an index file with the offsets of each block, plus a final offset at the end for the ?* end of the output file. This will be used by getBlockData to figure out where each block ?* begins and ends. ?* ?* It will commit the data and index file as an atomic operation, use the existing ones, or ?* replace them with new ones. ?* ?* Note: the `lengths` will be updated to match the existing index file if use the existing ones. ?* */ def writeIndexFileAndCommit( ??? shuffleId: Int, mapId: Int, lengths: Array[Long], dataTmp: File): Unit = { val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) ? Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. var offset = 0L out.writeLong(offset) for (length <- lengths) { ????? offset += length ????? out.writeLong(offset) ??? } ? } { ??? out.close() ? } val dataFile = getDataFile(shuffleId, mapId) // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure ? // the following check and rename are atomic. synchronized { val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) if (existingLengths != null) { // Another attempt for the same task has already written our map outputs successfully, ????? // so just use the existing partition lengths and delete our temporary map outputs. System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) if (dataTmp != null && dataTmp.exists()) { ??????? dataTmp.delete() ????? } ????? indexTmp.delete() ??? } else { // This is the first successful attempt in writing the map outputs for this task, ????? // so override any existing index and data files with the ones we wrote. if (indexFile.exists()) { ??????? indexFile.delete() ????? } if (dataFile.exists()) { ??????? dataFile.delete() ????? } if (!indexTmp.renameTo(indexFile)) { throw new IOException("fail to rename file " + indexTmp + " to " + indexFile) ????? } if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
1.???? 如果Mapper中Task的数量过大,依旧会产生很多小文件,此时在Shuffle传递数据的过程中到Reducer端,reduce会需要同时打开大量的记录来进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃!
???????? 可以改造Mapper和Reducer端,改框架来实现一次排序。
???????? 频繁GC的解决办法是:钨丝计划!!
25.Spark Sort-Based Shuffle内幕彻底解密