往YARN提交Samza job要使用run-job.sh这个脚本。
samza-example/target/bin/run-job.sh --config-factory=samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/config/hello-world.properties
这脚本的内容是什么呢?
exec $(dirname $0)/run-class.sh org.apache.samza.job.JobRunner $@
它是调用run-class这个脚本。
run-class.sh会根据环境变量HADOOP_CONF_DIR和HADOOP_YARN_HOME获取YARN的配置文件位置,把它加入CLASSPATH。同时会把samza job根目录下的lib文件夹里的jar或war都加进CLASSPATH。只所以需要yarn的配置文件,目的是得到Resource Manager的地址。而lib目录下的包,是运行job必须的。
run-class.sh也会通过环境变量$SAMZA_LOG_DIR获知samza的log应存放的位置,通过$SAMZA_CONTAINER_NAME决定container的名字,然后把它们用-D设在JAVA_OPTS中。然后在lib目录下查找log4j.xml文件,存在的话,就把-Dlog4j.configuration设为log4j.xml的路径。
通过以上动作,构造好了调用java所需的classpath和任务运行时需要的一些配置项,然后调用
exec $JAVA $JAVA_OPTS -cp $CLASSPATH $@
启动虚拟机。
可以,在调用run-job.sh时,会运行org.apache.samza.job.JobRunner这个类。在samza的官方指南中简要介绍了一下这个类的作用。Samza自带了两种StreamJobFactory :LocalJobFactory 和 YarnJobFactory 。 StreamJobFactory的作用就是把给JobRunner提供一个可以执行的job
1
2
3 |
public interface StreamJobFactory { StreamJob getJob(Config config); } |
而StreamJob就是一个可以执行的job, JobRunner会调用它的submit方法
1
2
3
4
5
6
7
8
9
10
11 |
public interface StreamJob { StreamJob submit(); StreamJob kill(); ApplicationStatus waitForFinish( long
timeoutMs); ApplicationStatus waitForStatus(ApplicationStatus status, long
timeoutMs); ApplicationStatus getStatus(); } |
下边来看JobRunner这个类。程序入口在JobRunner这个类的伴生对象里
1
2
3
4
5
6
7
8 |
object JobRunner extends
Logging { def main(args: Array[String]) { val cmdline = new
CommandLine val options = cmdline.parser.parse(args: _*) val config = cmdline.loadConfig(options) new
JobRunner(config).run } } |
在配置参数以后,会走到JobRunner的run方法, 下边是它的主要逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 |
val jobFactoryClass = conf.getStreamJobFactoryClass match { case
Some(factoryClass) => factoryClass case
_ => throw
new SamzaException( "no job factory class defined" ) } val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory] // Create the actual job, and submit it. val job = jobFactory.getJob(conf).submit //提交job info( "waiting for job to start" ) // Wait until the job has started, then exit. Option(job.waitForStatus(Running, 500 )) match { case
Some(appStatus) => { if
(Running.equals(appStatus)) { info( "job started successfully" ) } else
{ warn( "unable to start job successfully. job has status %s"
format (appStatus)) } } case
_ => warn( "unable to start job successfully." ) } |
首先,它会去conf里找到是否设置了job.factory,即有没有指定StreamJobFactory的实现,没有就抛出异常退出。否则就通过这个StreamJobFactory提交job。在提交后,等待500毫秒,如果任务的状态不是Running就退出。这里的Running并不代表任务已经在跑了,比如在使用YARN时,只要成功提交给Resource Manager,就算是running了,所以这里的running是“任务提交成功“的意思。
当提交给YARN时,我们使用YarnJobFactory这个StreamJobFactory的实现。
1
2
3
4
5
6
7
8
9
10 |
class YarnJobFactory extends
StreamJobFactory { def getJob(config: Config) = { // TODO fix this. needed to support http package locations. //这里会读yarn-site.xml。前提是yarn-site.xml必须在classpath里 在run-class.sh里,HADOOP_CONF_DIR路径被写进了classpath里 val hConfig = new
YarnConfiguration hConfig.set( "fs.http.impl" , classOf[HttpFileSystem].getName) new
YarnJob(config, hConfig) } } |
” hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)“ 这一句是使得可以在job里调用http文件系统,如把文件路径写成"http://xxx.xx.xx.xx:8080/xx/xx"这种。Samza自带了一个HTTP filsystem的实现。或许是LinkedIn的人需要这么用?
YarnJobFactory,主要就是构造了一个YarnConfiguration,和以前的commandLine参数一起作为config来构造一个YarnJob. YarnConfiguration是YARN自己的类,它会从classpath里读yarn-site.xml这个配置文件。
YarnJob这个才是要被提交给Yarn的任务,它实现了StreamJob这个接口。这里主要关心它的submit方法。
val client = new ClientHelper(hadoopConfig)
var appId:
Option[ApplicationId] = None
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 |
//提交job,注意会有AppMaster所需的内存和cpu数目。但不包括container数目 def submit: YarnJob = { appId = client.submitApplication( //注意submitApplication的返回值是appId new
Path(config.getPackagePath.getOrElse( throw
new SamzaException( "No YARN package path defined in config." ))), config.getAMContainerMaxMemoryMb.getOrElse(DEFAULT_AM_CONTAINER_MEM), 1 , List( "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)), Some(Map( ShellCommandConfig.ENV_CONFIG -> Util.envVarEscape(JsonConfigSerializer.toJson(config)), ShellCommandConfig.ENV_CONTAINER_NAME -> Util.envVarEscape( "application-master" ), ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(config.getAmOpts.getOrElse( "" )))), Some( "%s_%s"
format (config.getName.get, config.getJobId.getOrElse( 1 )))) this } |
submit方法来提交任务的过程交给了ClientHelper的submitAppliation来实现。
这个方法才是提交YARN任务的关键。
首先,我们知道要提交任务给YARN一定实现YARN指定的接口。那么来揣摩一下YARN需要我们提供给它什么东西,它才能调度一个job的执行。
来猜猜看(实际上我已经知道了一些,看能不能想得更全一点,重要是理清思路)
先看它的签名
def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId]
下边介绍一下各个参数的含义,有助于我们了解这个方法都干了啥事。
返回值是ApplicationId。这是一个Option,所以提交失败时,返回值就是None。
再写下去submitApplication这个方法的实现,就有些太长,换下一篇
Samza在YARN上的启动过程 =》 之一,布布扣,bubuko.com
原文:http://www.cnblogs.com/devos/p/3720002.html