首页 > 其他 > 详细

spark-submit提交应用程序的内部流程

时间:2016-02-27 19:17:17      阅读:814      评论:0      收藏:0      [点我收藏+]

我们经常通过spark-submit来提交spark应用程序,那么让我们一起看一下这里面到底发生了什么吧。

技术分享

知识点:

1.CLI命令行界面启动Spark应用程序

Unix有两种方式:1spark-submit 2spark-class。前者是我们常见的方式,后者是spark集群内部使用的方式。spark-submit实际上是调用spark-class来提交应用程序的,所以本质上是一种方式。

Win中有两种方式:1spark-submit.cmd 2spark-class.cmdspark-submit.cmd调用spark-class.cmd,spark-class.cmd调用spark-class2.cmd来完成的。

2.SparkSubmit.scala详解

SparkSubmit.scala包含3Object和一个class,分别是SparkSubmitSparkSubmitActionSparkSubmitUtilsOptionAssigner

(1)SparkSubmitAction是一个只允许在deploy包中访问的枚举子类,用来判断sparksubmit命令的请求类型。代码如下:

private[deploy] object SparkSubmitAction extends Enumeration {
  type SparkSubmitAction = Value
  val SUBMIT, KILL, REQUEST_STATUS = Value
}

(2)SparkSubmitUtils也是一个Object,由名字就可知它是一个sparksubmit的辅助类,主要用于一些参数的处理及maven相关依赖的处理

(3)SparkSubmit是一个非常重要的Object。

主要的几个字段如下所示:

// 集群管理

private val YARN = 1
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL

// 部署模式
private val CLIENT = 1
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER

我们可以看出,平时我们熟悉的这些字符串可能就只是一个整数而已

主要的几个方法如下所示:

 技术分享

main方法如下所示:

def main(args: Array[String]): Unit = {
  val appArgs = new SparkSubmitArguments(args)
  if (appArgs.verbose) {
    // scalastyle:off println
    printStream.println(appArgs)
    // scalastyle:on println
  }
  appArgs.action match {
      //通过spark-submit提交应用程序
    case SparkSubmitAction.SUBMIT => submit(appArgs)
      //通过spark-submit取消应用程序,目前只支持standalone cluster模式
    case SparkSubmitAction.KILL => kill(appArgs)
      //通过spark-submit请求得到应用程序,目前只支持standalone cluster模式
    case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
  }
}

submit方法中首先通过CLI传递过来的参数,设置不同模式下的合适的类路径、系统属性及应用参数,然后创建环境运行应用程序的Main方法,submit方法如下所示:

private def submit(args: SparkSubmitArguments): Unit = {
  val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

  def doRunMain(): Unit = {
    if (args.proxyUser != null) {
      val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
        UserGroupInformation.getCurrentUser())
      try {
        proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
          override def run(): Unit = {
            runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
          }
        })
      } catch {
            } else {
      runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
    }
  }
  if (args.isStandaloneCluster && args.useRest) {
    try {
           printStream.println("Running Spark using the REST application submission protocol.")
           doRunMain()
    } catch {
        }
   } else {
    doRunMain()
  }
}

由代码可知submit调用doRunMain方法,然后doRunMain方法调用runMain方法触发应用程序的main方法。

kill方法如下所示:利用CLI传递过来的子任务IDmaster通过Post方式取消任务

private def kill(args: SparkSubmitArguments): Unit = {
  new RestSubmissionClient(args.master)
    .killSubmission(args.submissionToKill)
}

requestStatus方法如下所示:利用CLI传递过来的子任务ID和master通过Get方式得到任务的具体信息

private def requestStatus(args: SparkSubmitArguments): Unit = {
  new RestSubmissionClient(args.master)
    .requestSubmissionStatus(args.submissionToRequestStatusFor)
}

spark-submit提交应用程序的内部流程

原文:http://www.cnblogs.com/yourarebest/p/5223338.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!