首页 > 其他 > 详细

Spark内核源码解析九:TaskScheduler原理解析和源码解析

时间:2020-05-12 19:27:44      阅读:44      评论:0      收藏:0      [点我收藏+]

1、DAGScheduler

  submitMissingTasks

    taskScheduler.submitTasks

      TaskSchedulerImpl.submitTasks

        createTaskSetManager

        backend.reviveOffers()

          DriverActor ReviveOffers

            makeOffers

 /** Called when stage‘s parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    // Get our pending tasks and remember them in our pendingTasks entry
    stage.pendingTasks.clear()

    // First figure out the indexes of partition ids to compute.
    val partitionsToCompute: Seq[Int] = {
      if (stage.isShuffleMap) {
        (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
      } else {
        val job = stage.resultOfJob.get
        (0 until job.numPartitions).filter(id => !job.finished(id))
      }
    }

    val properties = if (jobIdToActiveJob.contains(jobId)) {
      jobIdToActiveJob(stage.jobId).properties
    } else {
      // this stage will be assigned to "default" pool
      null
    }

    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
    outputCommitCoordinator.stageStart(stage.id)
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
    // the serialized copy of the RDD and for each task we will deserialize it, which means each
    // task gets a different copy of the RDD. This provides stronger isolation between tasks that
    // might modify state of objects referenced in their closures. This is necessary in Hadoop
    // where the JobConf/Configuration object is not thread-safe.
    var taskBinary: Broadcast[Array[Byte]] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      val taskBinaryBytes: Array[Byte] =
        if (stage.isShuffleMap) {
          closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()
        } else {
          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()
        }
      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      // In the case of a failure during serialization, abort the stage.
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString)
        runningStages -= stage
        return
      case NonFatal(e) =>
        abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
        runningStages -= stage
        return
    }
    // 为stage创建指定数量的Task
    val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
      // 给每一个partition创建一个Task,给每个Task计算最佳位置
      partitionsToCompute.map { id =>
        val locs = getPreferredLocs(stage.rdd, id)
        val part = stage.rdd.partitions(id)
        new ShuffleMapTask(stage.id, taskBinary, part, locs)
      }
    } else {
      // 如果不是shuffleMapStage就是finalStage就需要创建resultTask
      val job = stage.resultOfJob.get
      partitionsToCompute.map { id =>
        val p: Int = job.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = getPreferredLocs(stage.rdd, p)
        new ResultTask(stage.id, taskBinary, part, locs, id)
      }
    }

    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingTasks ++= tasks
      logDebug("New pending tasks: " + stage.pendingTasks)
      // 使用的是TaskSchedulerImpl作为实现类
      taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should post
      // SparkListenerStageCompleted here in case there are no tasks to run.
      outputCommitCoordinator.stageEnd(stage.id)
      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
      runningStages -= stage
    }
  }
 override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      // 给每个TaskSet创建一个TaskMananger 在TaskSchedulerImpl中,对一个单独的TaskSet的任务进行调度,
      // 这个类负责追踪每个task,如果task失败的话
      // *    会负责重试task,直到超过重试的次数,并且会通过延迟调度,为这个Taskset处理本地化调度机制,它的主要接口
      // *    是resourceOffer,在接口中,TaskSet会希望在一个节点上运行一个任务,并且接收任务状态的改变消息,来知道他负责的
      // *    状态改变了
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      //
      activeTaskSets(taskSet.id) = manager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
      }
      hasReceivedTask = true
    }
    // SparkContext原理解析时,讲过创建TaskScheduler时候要创建SparkDeploySchedulerBackend,这里的backend,指的就是
    // 之前创建好的SparkDeploySchedulerBackend,负责创建AppClient,向master注册application
    backend.reviveOffers()
  }
 case ReviveOffers =>
        makeOffers()
def makeOffers() {
      // 第一步,调用TaskScheduler的resourceOffers()方法,执行任务分配算法,将各个task分配到Executor上去
      // 第二步,分配好task到executor之后,执行自己的launchTasks()方法,将分配的task发送launchTask消息
      // 到对应的executor上去,由executor启动并执行task
      // 给resource方法传入的是
      // 传入的是这个Application所有可用的executor,并且将其封装成WorkerOffer,每个workerOffer代表了
      // 每个executor可用的cpu资源数量
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }
   */
  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      activeExecutorIds += o.executorId
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    // 首先,将可用的executor进行,shuffle,也就是说,进行打散,从而做到,尽量可以做到负载均衡

    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    // 然后针对WorkerOffer,创建出一堆需要用的东西。比如tasks,很重要可以理解为一个二维数组,ArrayBuffer元素又是一个ArrayBuffer
    // 每个ArrayBuffer的数量是固定的,也就是这个executor可用的cpu数量
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    // 从rootPool中取出了排序的TaskSet,之前讲解TaskScheduler初始化的时候,我们知道创建完TaskScheduleImpl,
    // SparkDeploySchedulerBackend
    // 之后执行一个initialize()方法,其实会创建一个调度池,然后执行task分配算法,会从这个调度池中取出排好队的TaskSet
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    var launchedTask = false
    // 双重for循环,遍历所有taskset,及每种本地化级别,本地化级别
    // PROCESS_LOCAL,进程本地化,rdd的partition和task,在同一个executor内,速度快
    // NODE_LOCAL, 也就是说,rdd的partition和task,不在一个executor内,但是在同一个Worker节点里面
    // NO_PREF,无没有本地化级别
    // RACK_LOCAL,机架本地化,至少RDD的partition和task在一个机架上
    // ANY 任意的本地化级别
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        // 对当前taskset采用最小本地化级别进行启动如果启动不了就跳出循环更换下一个本地化级别
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

 

  

Spark内核源码解析九:TaskScheduler原理解析和源码解析

原文:https://www.cnblogs.com/xiaofeiyang/p/12877682.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!