首页 > 其他 > 详细

JobProgressListener详解

时间:2020-07-21 22:57:13      阅读:118      评论:0      收藏:0      [点我收藏+]

一、简介

?????这个类是用来收集任务级别的信息展示在UI上。在这个类中访问数据都是同步的,因为UI线程和EventBus可能循环读取更新上面的内容。这个类继承了SparkListener这个抽象类,SparkListener继承SparkListenerInterface这个trait。

二、SparkListenerInterface

private[spark] trait SparkListenerInterface {

  /**
   * 当一个stage结束成功或者失败的时候调用
   */
  def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit

  /**
   * 提交一个stage的时候调用
   */
  def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit

  /**
   * task运行的时候调用
   */
  def onTaskStart(taskStart: SparkListenerTaskStart): Unit

  /**
   * 当一个task开始远程调用获取结果时调用(如果不需要远程调用获取结果就不会调用这个方法)
   */
  def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit

  /**
   * 当一个task运行结束的时候调用
   */
  def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit

  /**
   * 当一个job开始运行时调用
   */
  def onJobStart(jobStart: SparkListenerJobStart): Unit

  /**
   * 当一个job运行结束时调用
   */
  def onJobEnd(jobEnd: SparkListenerJobEnd): Unit

  /**
   * 当环境被修改时调用
   */
  def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit

  /**
   * 当新加入一个blockmanager时调用
   */
  def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit

  /**
   * 移除一个blockmanager时调用
   */
  def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit

  /**
   * 当一个RDD被手动移除缓存时调用
   */
  def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit

  /**
   * 当一个应用启动时调用
   */
  def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit

  /**
   * 当一个应用结束时调用
   */
  def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit

  /**
   * 当driver通过心跳获取一个executor中task的指标数据时调用
   */
  def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit

  /**
   * 当一个executor向driver注册时调用
   */
  def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit

  /**
   * 从driver中移除executor时调用
   */
  def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit

  /**
   * 当driver接收到block信息更新时调用
   */
  def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit

  /**
   * 在一些特别事件之后调用
   */
  def onOtherEvent(event: SparkListenerEvent): Unit
}

JobProgressListener源码详解

  // 定义了一些在job中用到的属性,这些属性是公开的
  type JobId = Int                  // jobId
  type JobGroupId = String          // jobGroupId
  type StageId = Int                // stageId
  type StageAttemptId = Int         
  type PoolName = String
  type ExecutorId = String          // executor id

  // Application:
  @volatile var startTime = -1L     // job运行开始时间
  @volatile var endTime = -1L       // job运行结束时间

  // Jobs:
  val activeJobs = new HashMap[JobId, JobUIData]
  val completedJobs = ListBuffer[JobUIData]()
  val failedJobs = ListBuffer[JobUIData]()
  val jobIdToData = new HashMap[JobId, JobUIData]
  val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]

  // Stages:
  val pendingStages = new HashMap[StageId, StageInfo]
  val activeStages = new HashMap[StageId, StageInfo]
  val completedStages = ListBuffer[StageInfo]()
  val skippedStages = ListBuffer[StageInfo]()
  val failedStages = ListBuffer[StageInfo]()
  val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
  val stageIdToInfo = new HashMap[StageId, StageInfo]
  val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]
  val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
  // Total of completed and failed stages that have ever been run.  These may be greater than
  // `completedStages.size` and `failedStages.size` if we have run more stages or jobs than
  // JobProgressListener‘s retention limits.
  var numCompletedStages = 0
  var numFailedStages = 0
  var numCompletedJobs = 0
  var numFailedJobs = 0

  val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()

  def blockManagerIds: Seq[BlockManagerId] = executorIdToBlockManagerId.values.toSeq

  var schedulingMode: Option[SchedulingMode] = None

  // 为了限制JobProgressListener使用的总内存,这里获取的信息都是固定个数的任务数,1000

  val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
  val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)

1、onJobStart

override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
    // scala语法,拿到groupId
    val jobGroup = for (
      props <- Option(jobStart.properties);
      group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
    ) yield group
    
    // 创建JobUIData
    val jobData: JobUIData =
      new JobUIData(
        jobId = jobStart.jobId,
        submissionTime = Option(jobStart.time).filter(_ >= 0),
        stageIds = jobStart.stageIds,
        jobGroup = jobGroup,
        status = JobExecutionStatus.RUNNING)
        
    // 把job添加到jobGroup中
    jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
    // 把job的StageInfo更新到pendingStages中
    jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
    // 计算这个job中可能要运行的任务个数(可能是低估的),因为job开始时引用所有stage的依赖,但是如果这些stage之前运行过就可以跳过这些stage
    jobData.numTasks = {
      val allStages = jobStart.stageInfos // 拿到所有的stage
      val missingStages = allStages.filter(_.completionTime.isEmpty)
      missingStages.map(_.numTasks).sum  // 对每个stage的任务求和
    }
    // 把jobData更新到jobIdToData和activeJobs中
    jobIdToData(jobStart.jobId) = jobData
    activeJobs(jobStart.jobId) = jobData
    // 把jobId更新到stageIdToActiveJobIds中
    for (stageId <- jobStart.stageIds) {
      stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet[StageId]).add(jobStart.jobId)
    }
    // 如果没有stage相关的信息,可以从调度中接受stage的信息来展示
    // 将stage的信息更新到stageIdToInfo和stageIdToData中
    for (stageInfo <- jobStart.stageInfos) {
      stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo)
      stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData)
    }
  }

2、onJobEnd

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
    // 从activeJobs中移除已经结束的job
    val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
      logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
      new JobUIData(jobId = jobEnd.jobId)
    }
    jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
    // 从pendingStages移除完成的stage
    jobData.stageIds.foreach(pendingStages.remove)
    // 根据job运行的状态进行处理
    jobEnd.jobResult match {
      case JobSucceeded =>
        completedJobs += jobData
        trimJobsIfNecessary(completedJobs)
        jobData.status = JobExecutionStatus.SUCCEEDED
        numCompletedJobs += 1
      case JobFailed(exception) =>
        failedJobs += jobData
        trimJobsIfNecessary(failedJobs)
        jobData.status = JobExecutionStatus.FAILED
        numFailedJobs += 1
    }
    // stageIdToActiveJobIds中移除完成的job和stage
    for (stageId <- jobData.stageIds) {
      stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
        jobsUsingStage.remove(jobEnd.jobId)
        if (jobsUsingStage.isEmpty) {
          stageIdToActiveJobIds.remove(stageId)
        }
        
        // 如果这个stage之前运行过这次没有运行,加入到skippedStages中
        stageIdToInfo.get(stageId).foreach { stageInfo =>
          if (stageInfo.submissionTime.isEmpty) {
            // if this stage is pending, it won‘t complete, so mark it as "skipped":
            skippedStages += stageInfo
            trimStagesIfNecessary(skippedStages)
            jobData.numSkippedStages += 1
            jobData.numSkippedTasks += stageInfo.numTasks
          }
        }
      }
    }
  }

3、剩下的代码基本与上面差不多,大致介绍下。
(1)onStageCompleted:当一个stage结束的时候进行一些清理操作
(2)onStageSubmitted:对于FIFO这种调度方式,所有的stage都会提交到default的资源池中。这个方法是在提交stage的时候进行一些初始化操作。
(3)onTaskStart:在task启动进行一些处理操作,将这个task加入到stage中
(4)onTaskEnd:当task结束时进行一些处理操作。

JobProgressListener详解

原文:https://www.cnblogs.com/liufei-yes/p/13356945.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!