TaskTracker也是作为一个单独的JVM来运行的,其main函数就是TaskTracker的入口函数,当运行start-all.sh时,脚本就是通过SSH运行该函数来启动TaskTracker的。
TaskTracker是JobTracker和Task之间的桥梁:一方面,从JobTracker接收并执行各种命令:运行任务、提交任务、杀死任务等;另一方面,将本地节点上各个任务的状态通过心跳周期性汇报给JobTracker。TaskTracker与JobTracker和Task之间采用了RPC协议进行通信。
1 public void run() { 2 try { 3 getUserLogManager().start(); 4 startCleanupThreads(); 5 boolean denied = false; 6 while (running && !shuttingDown && !denied) { 7 boolean staleState = false; 8 try { 9 // This while-loop attempts reconnects if we get network errors 10 while (running && !staleState && !shuttingDown && !denied) { //如果连接断开,则会循环尝试连接JobTracker 11 try { 12 //如果连接JobTracker服务成功,TaskTracker就会调用offerService()函数进入主执行循环中。 13 //这个循环会每隔一段时间与JobTracker通讯一次,调用transmitHeartBeat(),获得HeartbeatResponse信息。 14 State osState = offerService(); 15 if (osState == State.STALE) { 16 staleState = true; 17 } else if (osState == State.DENIED) { 18 denied = true; 19 } 20 } catch (Exception ex) { 21 if (!shuttingDown) { 22 LOG.info("Lost connection to JobTracker [" + 23 jobTrackAddr + "]. Retrying...", ex); 24 try { 25 Thread.sleep(5000); 26 } catch (InterruptedException ie) { 27 } 28 } 29 } 30 } 31 } finally { 32 close(); 33 } 34 if (shuttingDown) { return; } 35 LOG.warn("Reinitializing local state"); 36 initialize(); //初始化所有成员和参数 37 } 38 if (denied) { 39 shutdown(); 40 } 41 } catch (IOException iex) { 42 LOG.error("Got fatal exception while reinitializing TaskTracker: " + 43 StringUtils.stringifyException(iex)); 44 return; 45 } 46 catch (InterruptedException i) { 47 LOG.error("Got interrupted while reinitializing TaskTracker: " + 48 i.getMessage()); 49 return; 50 } 51 }
getUserLogManager().start()会启动两个线程:用户日志清理线程和监控日志行为线程;会启动一个taskCleanupThread线程,这个线程会始终监视BlockingQueue<TaskTrackerAction> tasksToCleanup(用来存放要被清理的TaskInProgress),里面存储的是KillJobAction或者是KillTaskAction类型;接下来是两层的while循环,其中内层循环是一旦连接断开就循环尝试连接JobTracker,循环主体是State osState = offerService(),offerService()方法一旦执行如果没有其它情况将会一直执行,如果内层while循环退出则尝试重新initialize(),如果offerService()方法返回的状态是DENIED则退出所有循环并执行shutdown()。
1 /** 2 * Main service loop. Will stay in this loop forever. 3 */ 4 State offerService() throws Exception { 5 long lastHeartbeat = System.currentTimeMillis();//上一次发心跳距现在时间 6 //此循环主要根据控制完成task个数控制心跳间隔。//TaskTracker进行是一直存在的 7 while (running && !shuttingDown) { 8 try { 9 long now = System.currentTimeMillis();//获得当前时间 10 11 // accelerate to account for multiple finished tasks up-front 12 //通过完成的任务数动态控制心跳间隔时间 13 long remaining = 14 (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; 15 while (remaining > 0) { 16 // sleeps for the wait time or 17 // until there are *enough* empty slots to schedule tasks 18 synchronized (finishedCount) { 19 finishedCount.wait(remaining); 20 21 // Recompute 22 now = System.currentTimeMillis(); 23 remaining = 24 (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; 25 26 if (remaining <= 0) { 27 // Reset count 28 finishedCount.set(0); 29 break; 30 } 31 } 32 } 33 34 // If the TaskTracker is just starting up: 35 // 1. Verify the buildVersion 36 // 2. Get the system directory & filesystem 37 if(justInited) { 38 String jobTrackerBV = jobClient.getBuildVersion(); 39 if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) { 40 String msg = "Shutting down. Incompatible buildVersion." + 41 "\nJobTracker‘s: " + jobTrackerBV + 42 "\nTaskTracker‘s: "+ VersionInfo.getBuildVersion(); 43 LOG.error(msg); 44 try { 45 jobClient.reportTaskTrackerError(taskTrackerName, null, msg); 46 } catch(Exception e ) { 47 LOG.info("Problem reporting to jobtracker: " + e); 48 } 49 return State.DENIED; 50 } 51 52 String dir = jobClient.getSystemDir(); 53 if (dir == null) { 54 throw new IOException("Failed to get system directory"); 55 } 56 systemDirectory = new Path(dir); 57 systemFS = systemDirectory.getFileSystem(fConf); 58 } 59 60 now = System.currentTimeMillis(); 61 if (now > (lastCheckDirsTime + diskHealthCheckInterval)) { 62 localStorage.checkDirs(); 63 lastCheckDirsTime = now; 64 int numFailures = localStorage.numFailures(); 65 // Re-init the task tracker if there were any new failures 66 if (numFailures > lastNumFailures) { 67 lastNumFailures = numFailures; 68 return State.STALE; 69 } 70 } 71 72 // Send the heartbeat and process the jobtracker‘s directives 73 //在transmitHeartBeat()函数处理中, 74 //TaskTracker会创建一个新的TaskTrackerStatus对象记录目前任务的执行状况, 75 //检查目前执行的Task数目以及本地磁盘的空间使用情况等, 76 //如果可以接收新的Task则设置heartbeat()的askForNewTask参数为true。 77 //然后通过IPC接口调用JobTracker的heartbeat()方法发送过去, 78 //heartbeat()返回值TaskTrackerAction数组。 79 HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);//发送Heartbeat到JobTracker,得到response 80 81 // Note the time when the heartbeat returned, use this to decide when to send the 82 // next heartbeat 83 lastHeartbeat = System.currentTimeMillis(); 84 85 // Check if the map-event list needs purging 86 Set<JobID> jobs = heartbeatResponse.getRecoveredJobs(); 87 if (jobs.size() > 0) { 88 synchronized (this) { 89 // purge the local map events list 90 for (JobID job : jobs) { 91 RunningJob rjob; 92 synchronized (runningJobs) { 93 rjob = runningJobs.get(job); 94 if (rjob != null) { 95 synchronized (rjob) { 96 FetchStatus f = rjob.getFetchStatus(); 97 if (f != null) { 98 f.reset(); 99 } 100 } 101 } 102 } 103 } 104 105 // Mark the reducers in shuffle for rollback 106 synchronized (shouldReset) { 107 for (Map.Entry<TaskAttemptID, TaskInProgress> entry 108 : runningTasks.entrySet()) { 109 if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) { 110 this.shouldReset.add(entry.getKey()); 111 } 112 } 113 } 114 } 115 } 116 //然后调用HeartbeatResponse的getActions()函数获得JobTracker传过来的所有指令即一个TaskTrackerAction数组。 117 TaskTrackerAction[] actions = heartbeatResponse.getActions(); 118 if(LOG.isDebugEnabled()) { 119 LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 120 heartbeatResponse.getResponseId() + " and " + 121 ((actions != null) ? actions.length : 0) + " actions"); 122 } 123 if (reinitTaskTracker(actions)) { 124 return State.STALE; 125 } 126 127 // resetting heartbeat interval from the response. 128 heartbeatInterval = heartbeatResponse.getHeartbeatInterval(); 129 justStarted = false; 130 justInited = false; 131 if (actions != null){ 132 //遍历这个数组,如果是一个新任务指令即LaunchTaskAction则调用调用addToTaskQueue加入到待执行队列, 133 //否则加入到tasksToCleanup队列,交给一个taskCleanupThread线程来处理,如执行KillJobAction或者KillTaskAction等。 134 for(TaskTrackerAction action: actions) { 135 if (action instanceof LaunchTaskAction) { 136 addToTaskQueue((LaunchTaskAction)action); //如果是运行一个新的Task,则将Action添加到任务队列中,加入TaskLauncher线程的执行队列 137 } else if (action instanceof CommitTaskAction) { 138 CommitTaskAction commitAction = (CommitTaskAction)action; 139 if (!commitResponses.contains(commitAction.getTaskID())) { 140 LOG.info("Received commit task action for " + 141 commitAction.getTaskID()); 142 commitResponses.add(commitAction.getTaskID()); 143 } 144 } else {//杀死任务或作业 145 tasksToCleanup.put(action); 146 } 147 } 148 } 149 //杀死一定时间没没有汇报进度的task 150 markUnresponsiveTasks(); 151 //当剩余磁盘空间小于mapred.local.dir.minspacekill(默认为0)时,寻找合适的任务将其杀掉以释放空间 152 killOverflowingTasks(); 153 154 //we‘ve cleaned up, resume normal operation 155 if (!acceptNewTasks && isIdle()) { 156 acceptNewTasks=true; 157 } 158 //The check below may not be required every iteration but we are 159 //erring on the side of caution here. We have seen many cases where 160 //the call to jetty‘s getLocalPort() returns different values at 161 //different times. Being a real paranoid here. 162 checkJettyPort(server.getPort()); 163 } catch (InterruptedException ie) { 164 LOG.info("Interrupted. Closing down."); 165 return State.INTERRUPTED; 166 } catch (DiskErrorException de) { 167 String msg = "Exiting task tracker for disk error:\n" + 168 StringUtils.stringifyException(de); 169 LOG.error(msg); 170 synchronized (this) { 171 jobClient.reportTaskTrackerError(taskTrackerName, 172 "DiskErrorException", msg); 173 } 174 return State.STALE; 175 } catch (RemoteException re) { 176 String reClass = re.getClassName(); 177 if (DisallowedTaskTrackerException.class.getName().equals(reClass)) { 178 LOG.info("Tasktracker disallowed by JobTracker."); 179 return State.DENIED; 180 } 181 } catch (Exception except) { 182 String msg = "Caught exception: " + 183 StringUtils.stringifyException(except); 184 LOG.error(msg); 185 } 186 } 187 188 return State.NORMAL; 189 }
方法中有一个循环,没有其他状况会一直循环执行,循环内首先会阻塞心跳间隔时间,心跳间隔是动态的会不断修正的;如果TaskTracker是刚刚启动,需要先确认版本一致否则直接返回State.DENIED,然后获取system directory和filesystem;超过磁盘检查间隔就对磁盘进行检查,是否有损坏,然后检查localStorage.numFailures(),如果大于lastNumFailures,则直接返回State.STALE,对TaskTracker重新初始化; transmitHeartBeat(now)方法会发送心跳到JobTracker,并返回心跳响应信息heartbeatResponse,相应信息包括两部分,一个是作业集合recoveredJobs它是上次关闭JobTracker时正在运行的作业集合,重启JobTracker后需要恢复这些作业的运行状态(前提是用户启用了作业恢复功能),而TaskTracker收到该作业集合后,需重置这些作业对应Reduce Task的FetchStatus信息,从而迫使这些Reduce Task重新从Map Task端拷贝数据,另一部分是需要执行的命令列表;heartbeatResponse.getActions()获取JobTracker发送过来的指令数组,首先要检查的是是否有重新初始化TaskTracker的action,如果有则返回State.STALE,会重新初始化TaskTracker;修正心跳间隔;遍历actions数组,根据action的类型加入不同的数据结构中,是LaunchTaskAction:如果是action.getTask().isMapTask()==true,则将action加入mapLauncher,否则加入reduceLauncher,这两个launcher都是TaskLauncher(该类是TaskTracker类的一个内部类,具有一个数据成员,是 TaskTracker.TaskInProgress类型的队列。在此特别注意,在TaskTracker类内部所提到的TaskInProgress 类均为TaskTracker的内部类,我们用TaskTracker.TaskInProgress表示,一定要和mapred包中的 TaskInProgress类区分,后者我们直接用TaskInProgress表示);是CommitTaskAction:则放入commitResponses,commitResponses.add(commitAction.getTaskID());其他的类型放入tasksToCleanup.put(action),表示要清理,如执行KillJobAction或者KillTaskAction等。markUnresponsiveTasks()方法遍历runningTasks杀死一定时间没没有汇报进度的task(purgeTask(tip, true)方法)。killOverflowingTasks()方法,当剩余磁盘空间小于mapred.local.dir.minspacekill(默认为0)时,寻找合适的任务将其杀掉以释放空间,期间不接受任何新的task,acceptNewTasks=false,通过findTaskToKill(null)方法(会遍历runningTasks,优先考虑杀死reduce任务)找到合适的TaskInProgress killMe,执行purgeTask(killMe, false)。一旦空间不足而出现杀死task的情况出现,就会一直不接受任何新的task,直到所有的task执行完毕和所有的清理task也执行完毕,但仍会正常向JobTracker发送心跳信息,信息内容就会有所变化。
transmitHeartBeat(now)方法是构造并将心跳信息发送到JobTracker。主要是构造一个TaskTrackerStatus这是要发送的东西,其内容包括任务的运行状态信息、TaskTracker资源信息、健康状态信息。如果可以接收新的Task则设置askForNewTask参数为true。当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数;当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数;acceptNewTasks==true。如果askForNewTask==true则对TaskTrackerStatus的实例status进行一些设置。然后对healthStatus = status.getHealthStatus()中的healthStatus进行一些设置。然后向JobTracker发送心跳并接受相应信息HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted,justInited,askForNewTask, heartbeatResponseId)这是一个RPC调用,具体可以查看JobTracker.heartbeat方法,后续再讲。调用成功后更新heartbeatResponseId。遍历所有task的状态TaskStatus,对那些状态为SUCCEEDED或者FAILED或者KILLED,做一些统计信息,根据task类型使得mapTotal或者reduceTotal减一,runningTasks去除这个task,然后清空runningTasks中所有的TaskInProgress.getStatus().clearStatus()状态信息,这些状态信息是瞬时的,仅发送一次,status = null,这些瞬时的状态信息是在构造TaskTrackerStatus时通过cloneAndResetRunningTaskStatuses(sendCounters)生成的。最后返回心跳结果heartbeatResponse。
这样offerService()方法通过while循环一直阻塞一定的心跳间隔,然后获取JobTracker的心跳应答信息,根据其中的action添加到不同的数据结构中,并做一些检查控制TaskTracker能够较为合理的运行,总是一遍又一遍的做这些。
至此,TaskTracker的启动过程讲解完了,错误之处还请指正。
TaskTracker启动过程源码级分析,布布扣,bubuko.com
原文:http://www.cnblogs.com/lxf20061900/p/3745283.html