首页 > 其他 > 详细

spark系列(一)----RDD

时间:2020-09-13 15:37:38      阅读:91      评论:0      收藏:0      [点我收藏+]

一.RDD是什么

  RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。

  在spark的源码里面我们可以看到,rdd是被abstract所修饰的,他是一个抽象类,它代表一个不可变,可分区,里面的元素可并行计算的集合。

  而在spark的工作流程中,RDD的主要作用是对数据进行结构的转换,在对RDD的方法源码中可以看到,方法传参中需要将当前RDD传进去,在最后又会新建一个RDD作为输出。这种数据转换的设计,体现出了装饰者设计模式。

 

二.RDD的特点

  抽象、分布式、不可变、可分区并行计算。

  1.RDD是分布式的,RDD本身不存储数据,它是拥有相同特性,或者说拥有相同数据结构的一类数据的逻辑划分,而这些数据分布在集群的各个节点上。

  2.RDD是可分区的,数据分区后,可以发送给不同的executor,RDD的分区主要是用来实现并行计算的。

  3.RDD不可变,在RDD的方法中,最终最是生成一个新的RDD做出输出,而不会直接修改原本的RDD。

  4.RDD里面封装的其实是逻辑,它的责任是告诉程序在运行时,要以什么样的逻辑去处理这一类数据。

  5.RDD中有一个叫做preferred location的列表,里面存储着分区的优先位置,而优先位置的概念是指,在spark分配任务给executor的时候,会优先分配给存有这个任务的数据的那个节点上的executor,这样executor在执行任务的时候,就不用从别的节点上拿取数据了。

 

三.RDD的宽依赖和窄依赖

  RDD之间是存在依赖关系的,RDD中将依赖分成了两种类型,宽依赖和窄依赖,窄依赖是指父RDD的每个分区都只能被子RDD一个分区使用,相应的,宽依赖就是指RDD的分区被多个子RDD的分区所依赖(如reduceByKey)。

 

四.RDD的缓存

  假如在应用程序中,某个RDD被多次重用,就可以把该RDD缓存起来,那样这个RDD里面划分的数据,只会在第一次计算的时候,从上游RDD中计算得到,而其余计算中,会直接使用缓存里面的数据进行计算。

 

五.RDD的创建

  rdd可以通过三种方式创建,分区通过集合(从内存中创建),通过外部数据,通过别的RDD。

    

六.RDD的分区

  RDD的分区代表着RDD中的数据继续逻辑化成成多少块,每个分区的数据可以交由一个executor去执行,以实现数据的并行计算,RDD的分区是可以由用户自己指定的,但是如果用户没有指定的话,在不同情况下,它有着不同的默认值。

  下面我们以makeRDD和textfile为例,看看spark的源码。

  makeRDD:

    这是以集合为基础生成的RDD,我们来看看它的具体代码

def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
}

    可以看到,这个方法除了要传入一个seq之外,还需要传入一个叫numSlices的参数,就是这个参数决定着并行度,而这个numSlices时从defaultparallelism这个方法那里获取到的。

def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
}

    这里又看到,该值是从taskScheduler.defaultParallelism处获取的,但是继续看下去,会发现这个方法时一个抽象方法。

技术分享图片

     因此,我们可以crtl+h看看这个方法具体在哪里实现了

技术分享图片

    搜索结果告诉我们,在TaskSchedulerlmpl里面有这个方法的具体实现

override def defaultParallelism(): Int = backend.defaultParallelism()

  // Check for speculatable tasks in all our active jobs.
  def checkSpeculatableTasks() {
    var shouldRevive = false
    synchronized {
      shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)
    }
    if (shouldRevive) {
      backend.reviveOffers()
    }
}

    这里可以看到,这个值又是从backend.defaultParallelism中传过来的,按照这种方式继续查下去,会一直查到一个叫coarseGrainedSchedulerBackend的文件中

override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

    这里就是最终决定这个参数的值是多少的地方了,首先会读取spark.default.parallelism这个配置的值,假如没有配置,则会拿当前计算机最大内核数与2做对比,取较大值。

 

  textfile:

    textfile是以外部文件为基础生成的RDD,下面是他的代码 

def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
}

    可以看到,这里的并行度是defaultMinPartitions这个方法决定的,这个方法没有传参

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

    它是拿刚刚defaultParallelism的值跟2做对比,取较小值,所以,defaultParallelism这个方法的值是怎么来的,还是得弄清楚。

 

spark系列(一)----RDD

原文:https://www.cnblogs.com/QicongLiang/p/13657893.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!