在hive的源码中经常可以看到Context类和DriverContext类,咋一看感觉这两个意思差不多,其实其作用区别还是蛮大的:
org.apache.hadoop.hive.ql.Context类
存储job的上下文信息,一个job创建一个Context对象,job运行完后,调用clear方法进行清除
1)初始化/创建/删除中间目录
中间的目录包括local job的和非local job的
protected int pathid = 10000; private static final String MR_PREFIX = "-mr-"; //对应getMRTmpPath方法 private static final String EXT_PREFIX = "-ext-"; //对应getLocalTmpPath方法 private static final String LOCAL_PREFIX = "-local-"; //对应getExternalTmpPath方法 private String nextPathId() { return Integer.toString(pathid++); //这导致最小的那个目录应该是1000 }
比如getMRTmpPath:
public Path getMRTmpPath() { return new Path(getMRScratchDir(), MR_PREFIX + nextPathId()); }
产生的临时文件路径如:
hdfs://xxx:9000/tmp/hive-ericni/hive_2014-12-18_14-37-00_106_3507079460876567552-1/_tmp.-mr-10003
产生临时目录的调用方法如下:
创建临时目录的调用
getMRScratchDir--->getScratchDir方法或者getLocalScratchDir方法(getLocalScratchDir最终也是调用getScratchDir方法,不过传入的参数是localScratchDir)
private final Map<String, Path> fsScratchDirs = new HashMap<String, Path>(); //用来存放对应关系的hashmap ..... private Path getScratchDir(String scheme, String authority, boolean mkdir, String scratchDir) { String fileSystem = scheme + ":" + authority; //hdfs:10.100.90.204:9000 Path dir = fsScratchDirs.get(fileSystem + "-" + TaskRunner.getTaskRunnerID()); //第一次调用getScratchDir方法时,fsScratchDirs.get(fileSystem + "-" + TaskRunner.getTaskRunnerID())的值为null(其中 if (dir == null) { //第一次运行时,为null,使用Utilities.createDirsWithPermission创建目录,权限为777(hive.scratch.dir.permission设置为777时) Path dirPath = new Path(scheme, authority, scratchDir + "-" + TaskRunner.getTaskRunnerID()); if (mkdir) { try { FileSystem fs = dirPath.getFileSystem(conf); dirPath = new Path(fs.makeQualified(dirPath).toString()); FsPermission fsPermission = new FsPermission(Short.parseShort(scratchDirPermission.trim(), 8)); if (!Utilities.createDirsWithPermission(conf, dirPath, fsPermission)) { throw new RuntimeException("Cannot make directory: " + dirPath.toString()); } if (isHDFSCleanup) { fs.deleteOnExit(dirPath); } } catch (IOException e) { throw new RuntimeException (e); } } dir = dirPath; fsScratchDirs.put(fileSystem + "-" + TaskRunner.getTaskRunnerID(), dir); } return dir; }
2)提供封装方法,操作一些其他的信息
比如isLocalOnlyExecutionMode job是否为localmode setHiveLocks 设置锁的信息, setHiveTxnManager设置锁的管理类, getHiveTxnManager获取锁的管理类setNeedLockMgr 设置是否需要锁, isNeedLockMgr返回是否需要锁等
org.apache.hadoop.hive.ql.DriverContext类
是和job 启动有关系的类,主要初始化两个queue,一个用来存在以及启动的job,一个用来存放可以启动的job
private Queue<Task<? extends Serializable>> runnable; private Queue<TaskRunner> running; public DriverContext(Context ctx) { this.runnable = new ConcurrentLinkedQueue<Task<? extends Serializable>>(); this.running = new LinkedBlockingQueue<TaskRunner>(); this.ctx = ctx; }
常用方法:
addToRunnable,把Task加到到runnable队列中:
public synchronized boolean addToRunnable(Task<? extends Serializable> tsk) throws HiveException { if (runnable.contains(tsk)) { return false; } checkShutdown(); runnable.add(tsk); tsk.setQueued(); return true; }
launching 把TaskRunner对象加入到running队列中,其中TaskRunner是一个线程类,用来启动Task:
public synchronized void launching(TaskRunner runner) throws HiveException { checkShutdown(); running.add(runner); }
getRunnable和job的并发launch有关(默认hive.exec.parallel设置为false),在开启并发launch job时,如果runnable中还有元素,并且running的队列大小小于设置的线程数(默认hive.exec.parallel.thread.number设置为8),则取出runnable中第一个元素,并最终加入到running中
public synchronized Task<? extends Serializable> getRunnable( int maxthreads) throws HiveException { checkShutdown(); if ( runnable.peek() != null && running.size() < maxthreads) { return runnable .remove(); } return null ; }
pollFinished,从running的队列中获取TaskRunner,直到running队列为空,也就是等待所有job运行完毕:
public synchronized TaskRunner pollFinished() throws InterruptedException { while (! shutdown) { Iterator< TaskRunner> it = running.iterator(); while (it.hasNext()) { TaskRunner runner = it.next(); if (runner != null && !runner.isRunning()) { it.remove(); return runner; } } wait( SLEEP_TIME); } return null ; }
本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog.51cto.com/1652935/1591569
原文:http://caiguangguang.blog.51cto.com/1652935/1591569