在hive的源码中经常可以看到Context类和DriverContext类,咋一看感觉这两个意思差不多,其实其作用区别还是蛮大的:
org.apache.hadoop.hive.ql.Context类
存储job的上下文信息,一个job创建一个Context对象,job运行完后,调用clear方法进行清除
1)初始化/创建/删除中间目录
中间的目录包括local job的和非local job的
protected int pathid = 10000;
private static final String MR_PREFIX = "-mr-"; //对应getMRTmpPath方法
private static final String EXT_PREFIX = "-ext-"; //对应getLocalTmpPath方法
private static final String LOCAL_PREFIX = "-local-"; //对应getExternalTmpPath方法
private String nextPathId() {
return Integer.toString(pathid++); //这导致最小的那个目录应该是1000
} 比如getMRTmpPath:
public Path getMRTmpPath() {
return new Path(getMRScratchDir(), MR_PREFIX +
nextPathId());
}产生的临时文件路径如:
hdfs://xxx:9000/tmp/hive-ericni/hive_2014-12-18_14-37-00_106_3507079460876567552-1/_tmp.-mr-10003
产生临时目录的调用方法如下:
创建临时目录的调用
getMRScratchDir--->getScratchDir方法或者getLocalScratchDir方法(getLocalScratchDir最终也是调用getScratchDir方法,不过传入的参数是localScratchDir)
private final Map<String, Path> fsScratchDirs = new HashMap<String, Path>(); //用来存放对应关系的hashmap
.....
private Path getScratchDir(String scheme, String authority,
boolean mkdir, String scratchDir) {
String fileSystem = scheme + ":" + authority; //hdfs:10.100.90.204:9000
Path dir = fsScratchDirs.get(fileSystem + "-" + TaskRunner.getTaskRunnerID());
//第一次调用getScratchDir方法时,fsScratchDirs.get(fileSystem + "-" + TaskRunner.getTaskRunnerID())的值为null(其中
if (dir == null) { //第一次运行时,为null,使用Utilities.createDirsWithPermission创建目录,权限为777(hive.scratch.dir.permission设置为777时)
Path dirPath = new Path(scheme, authority,
scratchDir + "-" + TaskRunner.getTaskRunnerID());
if (mkdir) {
try {
FileSystem fs = dirPath.getFileSystem(conf);
dirPath = new Path(fs.makeQualified(dirPath).toString());
FsPermission fsPermission = new FsPermission(Short.parseShort(scratchDirPermission.trim(), 8));
if (!Utilities.createDirsWithPermission(conf, dirPath, fsPermission)) {
throw new RuntimeException("Cannot make directory: "
+ dirPath.toString());
}
if (isHDFSCleanup) {
fs.deleteOnExit(dirPath);
}
} catch (IOException e) {
throw new RuntimeException (e);
}
}
dir = dirPath;
fsScratchDirs.put(fileSystem + "-" + TaskRunner.getTaskRunnerID(), dir);
}
return dir;
}2)提供封装方法,操作一些其他的信息
比如isLocalOnlyExecutionMode job是否为localmode setHiveLocks 设置锁的信息, setHiveTxnManager设置锁的管理类, getHiveTxnManager获取锁的管理类setNeedLockMgr 设置是否需要锁, isNeedLockMgr返回是否需要锁等
org.apache.hadoop.hive.ql.DriverContext类
是和job 启动有关系的类,主要初始化两个queue,一个用来存在以及启动的job,一个用来存放可以启动的job
private Queue<Task<? extends Serializable>> runnable;
private Queue<TaskRunner> running;
public DriverContext(Context ctx) {
this.runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>();
this.running = new LinkedBlockingQueue<TaskRunner>();
this.ctx = ctx;
}常用方法:
addToRunnable,把Task加到到runnable队列中:
public synchronized boolean addToRunnable(Task<? extends Serializable> tsk) throws HiveException {
if (runnable.contains(tsk)) {
return false;
}
checkShutdown();
runnable.add(tsk);
tsk.setQueued();
return true;
}launching 把TaskRunner对象加入到running队列中,其中TaskRunner是一个线程类,用来启动Task:
public synchronized void launching(TaskRunner runner) throws HiveException {
checkShutdown();
running.add(runner);
}getRunnable和job的并发launch有关(默认hive.exec.parallel设置为false),在开启并发launch job时,如果runnable中还有元素,并且running的队列大小小于设置的线程数(默认hive.exec.parallel.thread.number设置为8),则取出runnable中第一个元素,并最终加入到running中
public synchronized Task<? extends Serializable> getRunnable( int maxthreads) throws HiveException {
checkShutdown();
if ( runnable.peek() != null && running.size() < maxthreads) {
return runnable .remove();
}
return null ;
}pollFinished,从running的队列中获取TaskRunner,直到running队列为空,也就是等待所有job运行完毕:
public synchronized TaskRunner pollFinished() throws InterruptedException {
while (! shutdown) {
Iterator< TaskRunner> it = running.iterator();
while (it.hasNext()) {
TaskRunner runner = it.next();
if (runner != null && !runner.isRunning()) {
it.remove();
return runner;
}
}
wait( SLEEP_TIME);
}
return null ;
}本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog.51cto.com/1652935/1591569
原文:http://caiguangguang.blog.51cto.com/1652935/1591569