首页 > 其他 > 详细

SparkCore

时间:2019-04-05 14:39:41      阅读:162      评论:0      收藏:0      [点我收藏+]
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable;

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

RDD的属性

* Internally, each RDD is characterized by five main properties:
* - A list of partitions;  一组分区(Partition),即数据集的基本组成单位;
*  - A function for computing each split;    一个计算每个分区的函数;
*  - A list of dependencies on other RDDs;  RDD之间的依赖关系;
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned);一个Partitioner,即RDD的分片函数;
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file); 一个列表,存储存取每个Partition的优先位置(preferred location)。

RDD特点

RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。

弹性

存储的弹性:内存与磁盘的自动切换;

容错的弹性:数据丢失可以自动恢复;

计算的弹性:计算出错重试机制;

分片的弹性:可根据需要重新分片。

 分区

RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。

只读

如下图所示,RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。

由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了。

RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。

 依赖

RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。 

 缓存

如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。

 CheckPoint

虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。

RDD编程

 编程模型

在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。

    要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。

 RDD的创建

在Spark中创建RDD的创建方式可以分为三种:从集合中(内存)创建RDD;从外部存储(HDFS、本地磁盘、mysql等)创建RDD;从其他RDD创建。

从内存中创建:
/** Distribute a local Scala collection to form an RDD.
* This method is identical to `parallelize`.
*/
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {  可指定位置,发送到哪个分区的task,这种方法一般不用;
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
scala> val x = sc.makeRDD(List(1,2,3,4))
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> x.collect
res0: Array[Int] = Array(1, 2, 3, 4)

scala> val y = sc.parallelize(1 to 5).collect
y: Array[Int] = Array(1, 2, 3, 4, 5)

scala> z.collect
res1: Array[String] = Array(Hello World, Hello java, Hello spark, "")

查看分区数

scala> x.getNumPartitions
res2: Int = 8

scala> x.partitions.size
res4: Int = 8

默认分区规则:

从集合中创建默认分区规则:
在SparkContext中查找makeRDD

技术分享图片
numSlices: Int = defaultParallelism): RDD[T] = withScope 
taskScheduler.defaultParallelism
def defaultParallelism(): Int  ctrl+h 特质--看它的实现类
 override def defaultParallelism(): Int = backend.defaultParallelism()
def defaultParallelism(): Int 特质
 override def defaultParallelism(): Int =
    scheduler.conf.getInt("spark.default.parallelism", totalCores) alt+<-返回;总核数totalCores 8个

  def makeRDD[T: ClassTag](
      seq: Seq[T], 
      numSlices: Int = defaultParallelism): RDD[T] = withScope { ##defaultParallelism 8个
    parallelize(seq, numSlices)  #8个,如果从集合中创建RDD,Local模式的默认分区数是总核数
  }    
    
CoarseGrainedSchedulerBackend  yarn或standalone模式
  override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) #总核数与2取最大值
  }    
View Code

从文件系统中读默认分区规则:

scala> val z = sc.textFile("./wc.txt")
z: org.apache.spark.rdd.RDD[String] = ./wc.txt MapPartitionsRDD[6] at textFile at <console>:24

scala> z.getNumPartitions
res6: Int = 2
技术分享图片
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

    def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
  def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
  }    
  def defaultParallelism(): Int -->查看它的特质实现类   override def defaultParallelism(): Int = backend.defaultParallelism()
  def defaultParallelism(): Int -->查看它的特质实现类 
    override def defaultParallelism(): Int =
    scheduler.conf.getInt("spark.default.parallelism", totalCores)  ##总核数 8
返回:def defaultMinPartitions: Int = math.min(defaultParallelism, 2) ##defaultParallelism为8
    textFile中: minPartitions: Int = defaultMinPartitions): RDD[String] = withScope 这个值为2
hadoopFile中用到这个方法
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)    
 def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()
    
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
class HadoopRDD[K, V](
    sc: SparkContext,
    broadcastedConf: Broadcast[SerializableConfiguration],
    initLocalJobConfFuncOpt: Option[JobConf => Unit],
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int)      
  
override def getPartitions: Array[Partition] = {
    val inputSplits = inputFormat.getSplits(jobConf, minPartitions)}  ##
    
    getSplits-->InputFormat-->FileInputFormat找getSplits方法
    18  long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits); totalSize总大小wc.txt总共36字节,numSplits要传的参数2
    1  long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize); --> private long minSplitSize = 1L;
      long blockSize = file.getBlockSize(); #块大小,HDFS上128M,windows是32M
      long splitSize = this.computeSplitSize(goalSize, minSize, blockSize); (18, 1, 128) -->18
        return Math.max(minSize, Math.min(goalSize, blockSize)); 18
        文件切片机制按1.1倍判断,
        (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize
        36/22 > 1.1  -->36-22=14/22 <1.1不切, 最终得到2片切片
View Code

 

 

 

 



 

SparkCore

原文:https://www.cnblogs.com/shengyang17/p/10658549.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!