* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable;
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
RDD的属性
* Internally, each RDD is characterized by five main properties:
* - A list of partitions; 一组分区(Partition),即数据集的基本组成单位;
* - A function for computing each split; 一个计算每个分区的函数;
* - A list of dependencies on other RDDs; RDD之间的依赖关系;
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned);一个Partitioner,即RDD的分片函数;
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file); 一个列表,存储存取每个Partition的优先位置(preferred location)。
RDD特点
RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
弹性
存储的弹性:内存与磁盘的自动切换;
容错的弹性:数据丢失可以自动恢复;
计算的弹性:计算出错重试机制;
分片的弹性:可根据需要重新分片。
分区
RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。
只读
如下图所示,RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了。
RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。
依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。
CheckPoint
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。
在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。
在Spark中创建RDD的创建方式可以分为三种:从集合中(内存)创建RDD;从外部存储(HDFS、本地磁盘、mysql等)创建RDD;从其他RDD创建。
从内存中创建:
/** Distribute a local Scala collection to form an RDD.
* This method is identical to `parallelize`.
*/
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { 可指定位置,发送到哪个分区的task,这种方法一般不用;
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
scala> val x = sc.makeRDD(List(1,2,3,4)) x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24 scala> x.collect res0: Array[Int] = Array(1, 2, 3, 4) scala> val y = sc.parallelize(1 to 5).collect y: Array[Int] = Array(1, 2, 3, 4, 5) scala> z.collect res1: Array[String] = Array(Hello World, Hello java, Hello spark, "")
查看分区数
scala> x.getNumPartitions res2: Int = 8 scala> x.partitions.size res4: Int = 8
默认分区规则:
从集合中创建默认分区规则:
在SparkContext中查找makeRDD
numSlices: Int = defaultParallelism): RDD[T] = withScope taskScheduler.defaultParallelism def defaultParallelism(): Int ctrl+h 特质--看它的实现类 override def defaultParallelism(): Int = backend.defaultParallelism() def defaultParallelism(): Int 特质 override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores) alt+<-返回;总核数totalCores 8个 def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { ##defaultParallelism 8个 parallelize(seq, numSlices) #8个,如果从集合中创建RDD,Local模式的默认分区数是总核数 } CoarseGrainedSchedulerBackend yarn或standalone模式 override def defaultParallelism(): Int = { conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) #总核数与2取最大值 }
从文件系统中读默认分区规则:
scala> val z = sc.textFile("./wc.txt") z: org.apache.spark.rdd.RDD[String] = ./wc.txt MapPartitionsRDD[6] at textFile at <console>:24 scala> z.getNumPartitions res6: Int = 2
def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) } def defaultMinPartitions: Int = math.min(defaultParallelism, 2) def defaultParallelism: Int = { assertNotStopped() taskScheduler.defaultParallelism } def defaultParallelism(): Int -->查看它的特质实现类 override def defaultParallelism(): Int = backend.defaultParallelism() def defaultParallelism(): Int -->查看它的特质实现类 override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores) ##总核数 8 返回:def defaultMinPartitions: Int = math.min(defaultParallelism, 2) ##defaultParallelism为8 textFile中: minPartitions: Int = defaultMinPartitions): RDD[String] = withScope 这个值为2 hadoopFile中用到这个方法 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) class HadoopRDD[K, V]( sc: SparkContext, broadcastedConf: Broadcast[SerializableConfiguration], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int) override def getPartitions: Array[Partition] = { val inputSplits = inputFormat.getSplits(jobConf, minPartitions)} ## getSplits-->InputFormat-->FileInputFormat找getSplits方法 18 long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits); totalSize总大小wc.txt总共36字节,numSplits要传的参数2 1 long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize); --> private long minSplitSize = 1L; long blockSize = file.getBlockSize(); #块大小,HDFS上128M,windows是32M long splitSize = this.computeSplitSize(goalSize, minSize, blockSize); (18, 1, 128) -->18 return Math.max(minSize, Math.min(goalSize, blockSize)); 18 文件切片机制按1.1倍判断, (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize 36/22 > 1.1 -->36-22=14/22 <1.1不切, 最终得到2片切片
原文:https://www.cnblogs.com/shengyang17/p/10658549.html