一个job的生命历程
dagScheduler.runJob //(1)
--> submitJob ( eventProcessLoop.post(JobSubmitted,***) //(2)
--> eventProcessLoop //(3)
--> onReceive(event: DAGSchedulerEvent) //(4)
--> doOnReceive(event: DAGSchedulerEvent) //(5)
--> case JobSubmitted //(6)
--> dagScheduler.handleJobSubmitted //(7)
--> finalStage =createResultStage(finalRDD, func, partitions, jobId, callSite) //(8)
--> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(9)
--> jobIdToActiveJob(jobId) = job //(10)
--> activeJobs += job //(11)
--> finalStage.setActiveJob(job) //(12)
--> stageIds = jobIdToStageIds(jobId).toArray //(13)
--> stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) //(14)
--> listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //(15)
--> submitStage(finalStage) //(16)
--> getMissingParentStages(stage).sortBy(_.id) //(17)
--> finalStage = getOrCreateShuffleMapStage(dependency, jobId) //(18)
--> createShuffleMapStage(dep, firstJobId) //(19)
-->stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
--> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(20)
--> submitStage(finalStage) //(21)//划分和提交stage算法精髓
--> submitMissingTasks(stage, jobId.get) //(22)
--> submitWaitingChildStages(stage) //(23)
--> markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency)) //(24)
/**
* 获取某个stage的父stage
* 对于一个stage,如果它的最后一个RDD的所有依赖都是窄依赖,将不会创建新的stage
* 如果其RDD会依赖某个RDD,用宽依赖的RDD创建一个新的stage,并立即返回这个stage
* @type {[type]}
*/
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
//遍历RDD的依赖,对于每种具有shuffle的操作,如reduceByKey,groupByKey,countByKey,底层对应了3个RDD:
//Map
for (dep <- rdd.dependencies) {
dep match {
//如果是宽依赖
case shufDep: ShuffleDependency[_, _, _] =>
//使用宽依赖的RDD创建一个 ShuffleMapStage,并且将isShuffleMap 设置为true,
//默认最后一个stage不是shuffle不是ShuffleMapStage,但是finalstage之前所有的stage都是ShuffleMapStage
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
//如果是窄依赖
case narrowDep: NarrowDependency[_] =>
//将依赖的RDD放入栈中
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
//
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
//
visit(waitingForVisit.pop())
}
missing.toList
}
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks //获取ttaskSet的task列表
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//每个taskSet都会创建一个manager,用于管理每个taskSet,并设定最大失败次数 maxTaskFailures
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
//尝试连接task,如果task失败,会负责重试spark,直到超过重试次数,并且会通知延迟调度
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
//通过 manager 获得活着的taskSet
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
//利用已选择的调度器schedulableBuilder,把一个taskSet的manager加入调度管理池中
/*
def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}*/
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
/**
* 创建 taskScheduler 的时候,就是为 taskSchedulerImpl 创建一个 SparkDeploySchedulerBackend .
* 它负责创建AppClient,向master注册Application
*/
backend.reviveOffers()
}
原文:https://www.cnblogs.com/itboys/p/9185566.html