首页 > 其他 > 详细

【Spark】SparkContext源码解读

时间:2015-07-14 20:27:34      阅读:277      评论:0      收藏:0      [点我收藏+]

SparkContext的初始化

SparkContext是应用启动时创建的Spark上下文对象,是进行Spark应用开发的主要接口,是Spark上层应用与底层实现的中转站(SparkContext负责给executors发送task)。
SparkContext在初始化过程中,主要涉及一下内容:

  • SparkEnv
  • DAGScheduler
  • TaskScheduler
  • SchedulerBackend
  • SparkUI

生成SparkConf

SparkContext的构造函数中最重要的入参是SparkConf。SparkContext进行初始化的时候,首先要根据初始化入参来构建SparkConf对象,进而再去创建SparkEnv。
技术分享
创建SparkConf对象来管理spark应用的属性设置。SparkConf类比较简单,是通过一个HashMap容器来管理key、value类型的属性。
下图为SparkConf类声明,其中setting变量为HashMap容器:
技术分享
下面是SparkContext类中,关于SparkConf对象的拷贝过程:
技术分享

创建LiveListenerBus监听器

这是典型的观察者模式,向LiveListenerBus类注册不同类型的SparkListenerEvent事件,SparkListenerBus会遍历它的所有监听者SparkListener,然后找出事件对应的接口进行响应。
技术分享

下面是SparkContext创建LiveListenerBus对象:

  // An asynchronous listener bus for Spark events
  private[spark] val listenerBus = new LiveListenerBus

创建SparkEnv运行环境

在SparkEnv中创建了MapOutputTracker、MasterActor、BlockManager、CacheManager、HttpFileServer一系列对象。
下图为生成SparkEnv的代码:
技术分享

SparkEnv的构造函数入参列表为:

class SparkEnv (
    val executorId: String,
    val actorSystem: ActorSystem,
    val serializer: Serializer,
    val closureSerializer: Serializer,
    val cacheManager: CacheManager,
    val mapOutputTracker: MapOutputTracker,
    val shuffleManager: ShuffleManager,
    val broadcastManager: BroadcastManager,
    val blockTransferService: BlockTransferService,
    val blockManager: BlockManager,
    val securityManager: SecurityManager,
    val httpFileServer: HttpFileServer,
    val sparkFilesDir: String,
    val metricsSystem: MetricsSystem,
    val shuffleMemoryManager: ShuffleMemoryManager,
    val outputCommitCoordinator: OutputCommitCoordinator,
    val conf: SparkConf) extends Logging

这里说明几个入参的作用:

  • cacheManager: 用于存储中间计算结果
  • mapOutputTracker: 用来缓存MapStatus信息,并提供从MapOutputMaster获取信息的功能
  • shuffleManager: 路由维护表
  • broadcastManager: 广播
  • blockManager: 块管理
  • securityManager: 安全管理
  • httpFileServer: 文件存储服务器
    *l sparkFilesDir: 文件存储目录
  • metricsSystem: 测量
  • conf: 配置文件

创建SparkUI

下面是SparkContext初始化SparkUI的代码:
技术分享

其中,在SparkUI对象初始化函数中,注册了StorageStatusListener监听器,负责监听Storage的变化及时的展示到Spark web页面上。attachTab方法中添加对象正是我们在Spark Web页面中看到的那个标签。

  /** Initialize all components of the server. */
  def initialize() {
    attachTab(new JobsTab(this))
    val stagesTab = new StagesTab(this)
    attachTab(stagesTab)
    attachTab(new StorageTab(this))
    attachTab(new EnvironmentTab(this))
    attachTab(new ExecutorsTab(this))
    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
    attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
    attachHandler(
      createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
  }

创建TaskScheduler和DAGScheduler并启动运行

在SparkContext中, 最主要的初始化工作就是创建TaskScheduler和DAGScheduler, 这两个就是Spark的核心所在。

Spark的设计非常的干净, 把整个DAG抽象层从实际的task执行中剥离了出来DAGScheduler, 负责解析spark命令,生成stage, 形成DAG, 最终划分成tasks, 提交给TaskScheduler, 他只完成静态分析TaskScheduler,专门负责task执行, 他只负责资源管理, task分配, 执行情况的报告。
这样设计的好处, 就是Spark可以通过提供不同的TaskScheduler简单的支持各种资源调度和执行平台

下面代码是根据Spark的运行模式来选择相应的SchedulerBackend,同时启动TaskScheduler:
技术分享
其中,createTaskScheduler最为关键的一点就是根据master变量来判断Spark当前的部署方式,进而生成相应的SchedulerBackend的不同子类。创建的SchedulerBackend放置在TaskScheduler中,在后续的Task分发过程中扮演着重要角色。

TaskScheduler.start的目的是启动相应的SchedulerBackend,并启动定时器进行检测,下面是该函数源码(定义在TaskSchedulerImpl.scala文件中):

  override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      import sc.env.actorSystem.dispatcher
      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
            SPECULATION_INTERVAL milliseconds) {
        Utils.tryOrExit { checkSpeculatableTasks() }
      }
    }
  }

添加EventLoggingListener监听器

这个默认是关闭的,可以通过spark.eventLog.enabled配置开启。它主要功能是以json格式记录发生的事件:

  // Optionally log Spark events
  private[spark] val eventLogger: Option[EventLoggingListener] = {
    if (isEventLogEnabled) {
      val logger =
        new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
      logger.start()
      listenerBus.addListener(logger)
      Some(logger)
    } else None
  }

加入SparkListenerEvent事件

往LiveListenerBus中加入了SparkListenerEnvironmentUpdate、SparkListenerApplicationStart两类事件,对这两种事件监听的监听器就会调用onEnvironmentUpdate、onApplicationStart方法进行处理。

  setupAndStartListenerBus()
  postEnvironmentUpdate()
  postApplicationStart()

SparkContext类中的关键函数

textFile

要载入被处理的数据, 最常用的textFile, 其实就是生成HadoopRDD, 作为起始的RDD

  /**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }


    /** Get an RDD for a Hadoop file with an arbitrary InputFormat
   *
   * ‘‘‘Note:‘‘‘ Because Hadoop‘s RecordReader class re-uses the same Writable object for each
   * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
   * operation will create many references to the same object.
   * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
   * copy them using a `map` function.
   */
  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions
      ): RDD[(K, V)] = {
    assertNotStopped()
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

runJob

关键在于调用了dagScheduler.runJob

  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark. The allowLocal
   * flag specifies whether the scheduler can run the computation on the driver rather than
   * shipping it out to the cluster, for short actions like first().
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit) {
    if (stopped) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD‘s recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
      resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

说明

以上的源码解读基于spark-1.3.1源代码工程文件

转载请注明作者Jason Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页

版权声明:本文为博主原创文章,未经博主允许不得转载。

【Spark】SparkContext源码解读

原文:http://blog.csdn.net/jasonding1354/article/details/46882611

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!