首页 > 其他 > 详细

深入理解Spark 2.1 Core (六):资源调度与任务执行的实现与源码分析

时间:2017-01-06 11:17:26      阅读:315      评论:0      收藏:0      [点我收藏+]

在上篇博文《深入理解Spark 2.1 Core (五):Standalone模式运行的实现与源码分析》 中,我们讲到了如何启动Master和Worker,还讲到了如何回收资源。但是,我们没有将AppClient和Executor是如何启动的,其实它们的启动也涉及到了资源是如何调度的。这篇博文,我们就来讲一下Spark的资源调度,以及Executor启动后是如何执行Task的。

启动AppClient

调用栈如下:

  • StandaloneSchedulerBackend.start
    • StandaloneAppClient.start
      • StandaloneAppClient.ClientEndpoint.onStart
        • StandaloneAppClient.registerWithMaster
          • StandaloneAppClient.tryRegisterAllMasters
  • Master.receive
    • Master.createApplication
  • StandaloneAppClient.ClientEndpoint.receive

StandaloneSchedulerBackend.start

在Standalone模式下,SparkContext中的backend是StandaloneSchedulerBackend。在StandaloneSchedulerBackend.start中可以看到:

***
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)

    val initialExecutorLimit =
      if (Utils.isDynamicAllocationEnabled(conf)) {
        Some(0)
      } else {
        None
      }
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
      //创建AppClient
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    //启动AppClient
    client.start()
 ***

StandaloneAppClient.start

  def start() {
    //生成了ClientEndpoint,于是调用其onStart
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

StandaloneAppClient.ClientEndpoint.onStart

调用registerWithMaster

    override def onStart(): Unit = {
      try {
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }

StandaloneAppClient.registerWithMaster


    private def registerWithMaster(nthRetry: Int) {
            //向所有的Master注册当前App
           //一旦成功连接的一个master,其他将被取消
           registerMasterFutures.set(tryRegisterAllMasters())
      registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
        override def run(): Unit = {
          if (registered.get) {

           registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } 
          //若达到最大尝试次数,则标志死亡,默认为3
          else if (nthRetry >= REGISTRATION_RETRIES) {
            markDead("All masters are unresponsive! Giving up.")
          } else {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
    }

StandaloneAppClient.tryRegisterAllMasters

给Master发送RegisterApplication信号:

    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

Master.receive

Master.receive接收并处理RegisterApplication信号

    case RegisterApplication(description, driver) =>
      // 若之前注册过
      if (state == RecoveryState.STANDBY) {
        // 忽略
      } else {
        logInfo("Registering app " + description.name)
        //创建app
        val app = createApplication(description, driver)
        //注册app
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        //持久化
        persistenceEngine.addApplication(app)
        //回复RegisteredApplication信号
        driver.send(RegisteredApplication(app.id, self))
        //资源调度
        schedule()
      }

让我们深入来看下Master是如何注册app的。

Master.createApplication

先创建app:

  private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
      ApplicationInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)
    //根据日期生成appId
    val appId = newApplicationId(date)
    //传入 时间,appId, 描述信息, 日期, driver, 默认核数,
    //生成app信息
    new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
  }

Master.registerApplication

再注册app:

  private def registerApplication(app: ApplicationInfo): Unit = {
     //若已有这个app地址,
     //则返回
    val appAddress = app.driver.address
    if (addressToApp.contains(appAddress)) {
      logInfo("Attempted to re-register application at same address: " + appAddress)
      return
    }

   //向 applicationMetricsSystem 注册appSource
   applicationMetricsSystem.registerSource(app.appSource)
   //将app加入到 集合
   //HashSet[ApplicationInfo]
    apps += app
    //更新 id到App 
    //HashMap[String, ApplicationInfo]
    idToApp(app.id) = app
    //更新 endpoint到App
    // HashMap[RpcEndpointRef, ApplicationInfo]
    endpointToApp(app.driver) = app
    //更新 address到App
    // HashMap[RpcAddress, ApplicationInfo]
    addressToApp(appAddress) = app
    // 加入到等待数组中
    //ArrayBuffer[ApplicationInfo]
    waitingApps += app
    if (reverseProxy) {
      webUi.addProxyTargets(app.id, app.desc.appUiUrl)
    }
  }

StandaloneAppClient.ClientEndpoint.receive

      case RegisteredApplication(appId_, masterRef) =>
      //这里的代码有两个缺陷:
      //1. 一个Master可能接收到多个注册请求,
      // 并且回复多个RegisteredApplication信号,
      //这会导致网络不稳定。
      //2.若master正在变化,
      //则会接收到多个RegisteredApplication信号
        //设置appId
        appId.set(appId_)
        //编辑已经注册
        registered.set(true)
        //创建master信息
        master = Some(masterRef)
        //绑定监听
        listener.connected(appId.get)

深入理解Spark 2.1 Core (六):资源调度与任务执行的实现与源码分析

原文:http://blog.csdn.net/u011239443/article/details/54098376

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!