?????这个类是用来收集任务级别的信息展示在UI上。在这个类中访问数据都是同步的,因为UI线程和EventBus可能循环读取更新上面的内容。这个类继承了SparkListener这个抽象类,SparkListener继承SparkListenerInterface这个trait。
private[spark] trait SparkListenerInterface {
/**
* 当一个stage结束成功或者失败的时候调用
*/
def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit
/**
* 提交一个stage的时候调用
*/
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit
/**
* task运行的时候调用
*/
def onTaskStart(taskStart: SparkListenerTaskStart): Unit
/**
* 当一个task开始远程调用获取结果时调用(如果不需要远程调用获取结果就不会调用这个方法)
*/
def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit
/**
* 当一个task运行结束的时候调用
*/
def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit
/**
* 当一个job开始运行时调用
*/
def onJobStart(jobStart: SparkListenerJobStart): Unit
/**
* 当一个job运行结束时调用
*/
def onJobEnd(jobEnd: SparkListenerJobEnd): Unit
/**
* 当环境被修改时调用
*/
def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit
/**
* 当新加入一个blockmanager时调用
*/
def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit
/**
* 移除一个blockmanager时调用
*/
def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit
/**
* 当一个RDD被手动移除缓存时调用
*/
def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit
/**
* 当一个应用启动时调用
*/
def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit
/**
* 当一个应用结束时调用
*/
def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit
/**
* 当driver通过心跳获取一个executor中task的指标数据时调用
*/
def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit
/**
* 当一个executor向driver注册时调用
*/
def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit
/**
* 从driver中移除executor时调用
*/
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit
/**
* 当driver接收到block信息更新时调用
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
/**
* 在一些特别事件之后调用
*/
def onOtherEvent(event: SparkListenerEvent): Unit
}
// 定义了一些在job中用到的属性,这些属性是公开的
type JobId = Int // jobId
type JobGroupId = String // jobGroupId
type StageId = Int // stageId
type StageAttemptId = Int
type PoolName = String
type ExecutorId = String // executor id
// Application:
@volatile var startTime = -1L // job运行开始时间
@volatile var endTime = -1L // job运行结束时间
// Jobs:
val activeJobs = new HashMap[JobId, JobUIData]
val completedJobs = ListBuffer[JobUIData]()
val failedJobs = ListBuffer[JobUIData]()
val jobIdToData = new HashMap[JobId, JobUIData]
val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]
// Stages:
val pendingStages = new HashMap[StageId, StageInfo]
val activeStages = new HashMap[StageId, StageInfo]
val completedStages = ListBuffer[StageInfo]()
val skippedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
val stageIdToInfo = new HashMap[StageId, StageInfo]
val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]
val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
// Total of completed and failed stages that have ever been run. These may be greater than
// `completedStages.size` and `failedStages.size` if we have run more stages or jobs than
// JobProgressListener‘s retention limits.
var numCompletedStages = 0
var numFailedStages = 0
var numCompletedJobs = 0
var numFailedJobs = 0
val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
def blockManagerIds: Seq[BlockManagerId] = executorIdToBlockManagerId.values.toSeq
var schedulingMode: Option[SchedulingMode] = None
// 为了限制JobProgressListener使用的总内存,这里获取的信息都是固定个数的任务数,1000
val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
1、onJobStart
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
// scala语法,拿到groupId
val jobGroup = for (
props <- Option(jobStart.properties);
group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
) yield group
// 创建JobUIData
val jobData: JobUIData =
new JobUIData(
jobId = jobStart.jobId,
submissionTime = Option(jobStart.time).filter(_ >= 0),
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
// 把job添加到jobGroup中
jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
// 把job的StageInfo更新到pendingStages中
jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
// 计算这个job中可能要运行的任务个数(可能是低估的),因为job开始时引用所有stage的依赖,但是如果这些stage之前运行过就可以跳过这些stage
jobData.numTasks = {
val allStages = jobStart.stageInfos // 拿到所有的stage
val missingStages = allStages.filter(_.completionTime.isEmpty)
missingStages.map(_.numTasks).sum // 对每个stage的任务求和
}
// 把jobData更新到jobIdToData和activeJobs中
jobIdToData(jobStart.jobId) = jobData
activeJobs(jobStart.jobId) = jobData
// 把jobId更新到stageIdToActiveJobIds中
for (stageId <- jobStart.stageIds) {
stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet[StageId]).add(jobStart.jobId)
}
// 如果没有stage相关的信息,可以从调度中接受stage的信息来展示
// 将stage的信息更新到stageIdToInfo和stageIdToData中
for (stageInfo <- jobStart.stageInfos) {
stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo)
stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData)
}
}
2、onJobEnd
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
// 从activeJobs中移除已经结束的job
val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
new JobUIData(jobId = jobEnd.jobId)
}
jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
// 从pendingStages移除完成的stage
jobData.stageIds.foreach(pendingStages.remove)
// 根据job运行的状态进行处理
jobEnd.jobResult match {
case JobSucceeded =>
completedJobs += jobData
trimJobsIfNecessary(completedJobs)
jobData.status = JobExecutionStatus.SUCCEEDED
numCompletedJobs += 1
case JobFailed(exception) =>
failedJobs += jobData
trimJobsIfNecessary(failedJobs)
jobData.status = JobExecutionStatus.FAILED
numFailedJobs += 1
}
// stageIdToActiveJobIds中移除完成的job和stage
for (stageId <- jobData.stageIds) {
stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
jobsUsingStage.remove(jobEnd.jobId)
if (jobsUsingStage.isEmpty) {
stageIdToActiveJobIds.remove(stageId)
}
// 如果这个stage之前运行过这次没有运行,加入到skippedStages中
stageIdToInfo.get(stageId).foreach { stageInfo =>
if (stageInfo.submissionTime.isEmpty) {
// if this stage is pending, it won‘t complete, so mark it as "skipped":
skippedStages += stageInfo
trimStagesIfNecessary(skippedStages)
jobData.numSkippedStages += 1
jobData.numSkippedTasks += stageInfo.numTasks
}
}
}
}
}
3、剩下的代码基本与上面差不多,大致介绍下。
(1)onStageCompleted:当一个stage结束的时候进行一些清理操作
(2)onStageSubmitted:对于FIFO这种调度方式,所有的stage都会提交到default的资源池中。这个方法是在提交stage的时候进行一些初始化操作。
(3)onTaskStart:在task启动进行一些处理操作,将这个task加入到stage中
(4)onTaskEnd:当task结束时进行一些处理操作。
原文:https://www.cnblogs.com/liufei-yes/p/13356945.html