上次分析完JobTracker通过TaskScheduler如何把作业分配给TaskTracker,这次把目光 移动到TaskTracker上面。TaskTracker在这里其实是一个slave的从属关系。我在后面的分析会通过TaskTracker的执行流程,主要讲他的2个过程的分析1.作业启动执行2.与JobTracker的heatbeat的过程。2个过程都是非常的典型。
/** * Start the TaskTracker, point toward the indicated JobTracker * taskTracker同样也是一个服务程序,main函数开始执行 */ public static void main(String argv[]) throws Exception { StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG); if (argv.length != 0) { System.out.println("usage: TaskTracker"); System.exit(-1); } try { //初始化作业配置 JobConf conf=new JobConf(); // enable the server to track time spent waiting on locks ReflectionUtils.setContentionTracing (conf.getBoolean("tasktracker.contention.tracking", false)); //初始化度量统计系统 DefaultMetricsSystem.initialize("TaskTracker"); //根据作业配置初始化TaskTracker TaskTracker tt = new TaskTracker(conf); //注册MBean,方便外界工具检测TaskTracker的状态 MBeans.register("TaskTracker", "TaskTrackerInfo", tt); //执行TaskTracker服务主程序 tt.run(); } catch (Throwable e) { LOG.error("Can not start task tracker because "+ StringUtils.stringifyException(e)); System.exit(-1); } }让后我们进入其中的执行主程序tt.run():
/** * The server retry loop. * This while-loop attempts to connect to the JobTracker. It only * loops when the old TaskTracker has gone bad (its state is * stale somehow) and we need to reinitialize everything. */ public void run() { try { getUserLogManager().start(); //开启CleanUp清理线程 startCleanupThreads(); boolean denied = false; while (running && !shuttingDown && !denied) { boolean staleState = false; try { // This while-loop attempts reconnects if we get network errors while (running && !staleState && !shuttingDown && !denied) { try { //offerService()执行了核心的启动操作 State osState = offerService(); if (osState == State.STALE) { staleState = true; } else if (osState == State.DENIED) { denied = true; } ......我们可以看到,这里通过while操作,循环进行服务操作,如果拒绝服务,则会shutdown中断服务,服务的主要操作又在offerService方法中:
/** * Main service loop. Will stay in this loop forever. * 主要的循环服务操作 */ State offerService() throws Exception { ..... // Send the heartbeat and process the jobtracker's directives //发送给JobTracker心跳包 HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); // Note the time when the heartbeat returned, use this to decide when to send the // next heartbeat lastHeartbeat = System.currentTimeMillis(); .... //在这里获取了心跳回应中的action命令 TaskTrackerAction[] actions = heartbeatResponse.getActions(); if(LOG.isDebugEnabled()) { LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + heartbeatResponse.getResponseId() + " and " + ((actions != null) ? actions.length : 0) + " actions"); } if (reinitTaskTracker(actions)) { return State.STALE; } // resetting heartbeat interval from the response. heartbeatInterval = heartbeatResponse.getHeartbeatInterval(); justStarted = false; justInited = false; if (actions != null){ for(TaskTrackerAction action: actions) { if (action instanceof LaunchTaskAction) { //如果是执行Task任务指令,执行添加任务到任务队列中 addToTaskQueue((LaunchTaskAction)action); } else if (action instanceof CommitTaskAction) { //如果是提交任务的指令,则执行后面的操作 CommitTaskAction commitAction = (CommitTaskAction)action; if (!commitResponses.contains(commitAction.getTaskID())) { LOG.info("Received commit task action for " + commitAction.getTaskID()); commitResponses.add(commitAction.getTaskID()); } } else { //其他的指令一律添加到tasksToCleanup队列中等待被处理 tasksToCleanup.put(action); } } } .....在这里我省略了比较多的代码,把执行任务相关的核心操作保留了,在这里就开始执行了后面的和Task相关的很多操作了,当然这些任务都是通过收到JobTracker的心跳包Response来获得的,在通过获取里面的TaskTrackerAction命令来判断执行的。TaskTrackerAction里面包含了1枚举类,包括了以下的相关指令:
private void addToTaskQueue(LaunchTaskAction action) { //任务类型加入到任务待执行的容器中 if (action.getTask().isMapTask()) { mapLauncher.addToTaskQueue(action); } else { reduceLauncher.addToTaskQueue(action); } }这里的mapLauncher,reduceLauncher的类型是TaskLauncher,他是一个线程类:
class TaskLauncher extends Thread { private IntWritable numFreeSlots; private final int maxSlots; private List<TaskInProgress> tasksToLaunch; ....也就是说,待执行的map,Reduce任务都是添加到taskToLauch中的,
public void addToTaskQueue(LaunchTaskAction action) { //新建1个TIP,并加入tasksToLaunch列表 synchronized (tasksToLaunch) { TaskInProgress tip = registerTask(action, this); tasksToLaunch.add(tip); //唤醒所有被tasksToLaunch wait的操作,说明此时有新的任务了 tasksToLaunch.notifyAll(); } }加入之后唤醒相应的操作,这个就很好理解了,一定是在empty的时候被阻塞住了,
public void run() { while (!Thread.interrupted()) { try { TaskInProgress tip; Task task; synchronized (tasksToLaunch) { while (tasksToLaunch.isEmpty()) { tasksToLaunch.wait(); } //get the TIP tip = tasksToLaunch.remove(0); task = tip.getTask(); LOG.info("Trying to launch : " + tip.getTask().getTaskID() + " which needs " + task.getNumSlotsRequired() + " slots"); } //wait for free slots to run ..... //got a free slot. launch the task startNewTask(tip);到了startNewTask就是开始所谓的任务了。到此为止,TaskTracker的任务执行这条路,我们算彻底打通了,相关时序图如下:
/** * Main service loop. Will stay in this loop forever. * 主要的循环服务操作 */ State offerService() throws Exception { long lastHeartbeat = System.currentTimeMillis(); while (running && !shuttingDown) { try { long now = System.currentTimeMillis(); // accelerate to account for multiple finished tasks up-front //判断上次心跳的时间+心跳等待时间是否已经到了当前时间,如果到了可以发送新的心跳包 long remaining = (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; //如果还没到,时间有剩余,则要强行等待剩余的时间 while (remaining > 0) { // sleeps for the wait time or // until there are *enough* empty slots to schedule tasks synchronized (finishedCount) { finishedCount.wait(remaining); // Recompute now = System.currentTimeMillis(); remaining = (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; if (remaining <= 0) { // Reset count finishedCount.set(0); break; } } } .....假设已经到达了发送时间了,会执行后面的操作,检测版本号,TaskTracker和JobTracker的版本号必须一致:
..... // If the TaskTracker is just starting up: // 1. Verify the buildVersion // 2. Get the system directory & filesystem if(justInited) { //验证版本号,如果版本号不对,则返回拒绝状态 String jobTrackerBV = jobClient.getBuildVersion(); if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) { String msg = "Shutting down. Incompatible buildVersion." + "\nJobTracker's: " + jobTrackerBV + "\nTaskTracker's: "+ VersionInfo.getBuildVersion(); LOG.error(msg); try { jobClient.reportTaskTrackerError(taskTrackerName, null, msg); } catch(Exception e ) { LOG.info("Problem reporting to jobtracker: " + e); } return State.DENIED; }如果通过上述2个验证,基本上就达到了发送的条件了,下面就准备发送操作了:
// Send the heartbeat and process the jobtracker's directives //发送给JobTracker心跳包 HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);就是在上面这个方法中实现了发送的操作,此方法的返回值是JobTracker的心跳回复包,里面就包含着刚刚的TaskTrackerAction命令信息。我们进入transmitHeartBeat。之前分析过,心跳机制的有1个主要的作用就是汇报TaskTracker的资源使用情况和作业执行情况给JobTracker节点。以此可以让主节点可以进行资源调配。所以在上面的这个方法必不可少的操作是构建TaskTracker的Status状态信息。这个类包含的信息还比较多。下面是主要的此类的关系结构:
/** * Build and transmit the heart beat to the JobTracker * 将TaskTracker自身的状态信息发送给JobTracker,并获得一个心跳包的回应 * @param now current time * @return false if the tracker was unknown * @throws IOException */ HeartbeatResponse transmitHeartBeat(long now) throws IOException { .... // // Check if the last heartbeat got through... // if so then build the heartbeat information for the JobTracker; // else resend the previous status information. // if (status == null) { synchronized (this) { status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses( sendCounters), failures, maxMapSlots, maxReduceSlots); }后面就是各种获取节点CPU,内存等基本信息,这里就不列举了,不过这里提一点,对于TaskTracker是否还能运行任务,在这里是通过TaskTracker是否达到了它的maxSlot上限作为1个标准。一般1个Reduce Task占据1个slot单元,1个Map Task同样占据1个Slot单元,如果1个TaskTracker结点拥有好多slot单元,那么他就可以运行很多Task。
// // Check if we should ask for a new Task // 检测TaskTracker是否需要一个新 Task任务 // boolean askForNewTask; long localMinSpaceStart; synchronized (this) { //通过判断当前所占据的slots数量是否已经达到最大slot的数量作为标准 askForNewTask = ((status.countOccupiedMapSlots() < maxMapSlots || status.countOccupiedReduceSlots() < maxReduceSlots) && acceptNewTasks); localMinSpaceStart = minSpaceStart; }askForNewTask布尔类型就代表TaskTracker是否还能运行新的任务,封装好了这些Status信息之后,就要执行关键的发送步骤了:
// // Xmit the heartbeat // 通过JobClient发送给JobTracker,并获得1个回复 // HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited, askForNewTask, heartbeatResponseId);是通过JobClient的方法发送的。得到的heartbeatResponse返回结果就是JobTracker结果了。至于里面JobClient具体怎么发送就不是本次分析的重点了,HeartBeat也分析完毕。同样看一下流程图: