/**
* 主要是将任务提交到集群中去并等待完成
* boolean verbose:是否将进度打印给用户看
* return 任务成功返回true
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
//判断job状态是否为define,避免二次提交,JobState为枚举:DEFINE,RUNNING
if (state == JobState.DEFINE) {
submit();//将任务提交到集群 --> 1.1 submit()
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
// 该方法做了系统兼容,避免出现框架更新,老版本无法使用
setUseNewAPI();
// 这里面创建了一个很重要的对象,用于建立连接本地还是集群连接 --> 1.1.1 connect()
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
/* 真正开始提交job流程,准备工作终于做完了 --> JobStatus submitJobInternal(Job job, Cluster cluster)
*/
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
//第一次连接的话一定为null
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
//创建提交Job的代理 --> cluster(Configuration conf)
return new Cluster(getConfiguration());
}
});
}
}
/**
* public Cluster(Configuration conf)
* 主要作用是获取你在Driver中Configuration配置的文件信息,没有配置使用默认
*/
public Cluster(Configuration conf) throws IOException {
this(null, conf);
}
/**
* 调用initialize(jobTrackAddr, conf)返回的值
* jobTrackAddr:local还是yarn
* conf:配置的信息
*/
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf); // --> initialize()
}
/**
* ...表示该方法中的校验代码
* jobTrackAddr:状态
* conf:配置信息
*/
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)throws IOException {
...
/*
判断Driver里配置状态,如果是yarn则返回YarnRunner对象,如果没有配置则返回LocalJobRunner对象
*/
if (jobTrackAddr == null) {
/*
第一次判断--> public ClientProtocol create(Configuration conf)
第二次判断--> public ClientProtocol create(Configuration conf)
*/
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
...
}
/**
* yarn是否和Driver中配置的conf.set("mapreduce.framework.name","yarn");
* 相同,相同的话就返回一个YARNRunner对象,没有配置的话就返回一个null,而后进行
* 第二次判断
*/
public ClientProtocol create(Configuration conf) throws IOException {
return "yarn".equals(conf.get("mapreduce.framework.name")) ?
new YARNRunner(conf) : null;
}
/**
* 之前返回为null的话就进行第二次判断
*/
public ClientProtocol create(Configuration conf) throws IOException {
/*
MRConfig.FRAMEWORK_NAME:"mapreduce.framework.name"
LOCAL_FRAMEWORK_NAME:"local"
没有这个值就获得一个LOCAL_FRAMEWORK_NAME
*/
String framework =conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
//判断两方是否都为local,相同的话就创建LocalJobRunner()对象
if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
return null;
}
conf.setInt(JobContext.NUM_MAPS, 1);
return new LocalJobRunner(conf);
} // --> submint() --> return submitter.submitJobInternal(Job.this, cluster);
总结:connect方法最终要的地方就是,为我们创建了一个关键的对象LocalJobRunner对象,这个对象为我们之后提交作业所用,很重要。
JobStatus submitJobInternal(Job job, Cluster cluster)throws ClassNotFoundException, InterruptedException, IOException {
/*
校验文件输出路径是否在Driver中配置,如果没有配置抛出InvalidJobConfException,如果文件路径存在抛出 FileAlreadyException
*/
checkSpecs(job);
// 获取conf配置
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
// 创建路径,往路劲里生成信息,提供给APPMaster使用 在集群中也就是tem路劲
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 配置校验信息我就用...代替了,太多容易把眼睛看花,感兴趣的朋友可以用DeBug边跳边看里面的具体信息。
...
// 创建jobId 也就是8088端口中 任务的id
JobID jobId = submitClient.getNewJobID();
// 获得jobId设置到job里
job.setJobID(jobId);
// 将jobStagingArea和jobId拼在一起,拼成一个提交信息(配置信息,切片信息,jar包)的路径
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
...
// 拷贝jar包到集群(本地模式看不到,在向集群提交的时候才能看到jar包)
copyAndConfigureFiles(job, submitJobDir);
// 会在submitJobDir目录下创建一个job.xml文件
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
/* 切片,具体怎么切,再看切片源码的时候会提到,在执行完该方法后submitJobDir路径中会多出split和crc 文件
*/
int maps = writeSplits(job, submitJobDir);
原文:https://www.cnblogs.com/tangtangtangtang/p/14342913.html