protected void serviceStart() throws Exception { //获得服务列表 List<Service> services = getServices(); if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": starting services, size=" + services.size()); } for (Service service : services) { // start the service. If this fails that service // will be stopped and an exception raised service.start();//启动相应服务 } super.serviceStart(); }这段代码循环启动如下服务
//LocalDirsHandlerService.serviceStart() line: 154 @Override protected void serviceStart() throws Exception { if (isDiskHealthCheckerEnabled) { dirsHandlerScheduler = new Timer("DiskHealthMonitor-Timer", true); dirsHandlerScheduler.scheduleAtFixedRate(monitoringTimerTask, diskHealthCheckInterval, diskHealthCheckInterval); } super.serviceStart(); }目录检测类构建如下:
public MonitoringTimerTask(Configuration conf) throws YarnRuntimeException { localDirs = new DirectoryCollection( validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS))); logDirs = new DirectoryCollection( validatePaths(conf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS))); localDirsAllocator = new LocalDirAllocator( YarnConfiguration.NM_LOCAL_DIRS); logDirsAllocator = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS); }检测函数如下,因为是对文件系统的检测,无非是读写执行这些权限,所以这里就不继续贴下去了
private void checkDirs() { boolean newFailure = false; if (localDirs.checkDirs()) { newFailure = true; } if (logDirs.checkDirs()) { newFailure = true; } if (newFailure) { updateDirsAfterFailure(); } lastDisksCheckTime = System.currentTimeMillis(); }ContainerManager服务:该服务会启动一个RPC服务器,协议为:ContainerManagementProtocol用于AM与NM之间的通信,handler线程数量由yarn.nodemanager.container-manager.thread-count控制
@Override protected void serviceStart() throws Exception { // Enqueue user dirs in deletion context Configuration conf = getConfig(); Configuration serverConf = new Configuration(conf); // always enforce it to be token-based. serverConf.set( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, SaslRpcServer.AuthMethod.TOKEN.toString()); YarnRPC rpc = YarnRPC.create(conf); InetSocketAddress initialAddress = conf.getSocketAddr( YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS, YarnConfiguration.DEFAULT_NM_PORT); //创建RPC服务器,用于接收AM的控制,具体协议可参考ContainerManagementProtocol server = rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, serverConf, this.context.getNMTokenSecretManager(), conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT)); // Enable service authorization? if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { refreshServiceAcls(conf, new NMPolicyProvider()); } //服务器启动阶段不接受客户端的控制请求 LOG.info("Blocking new container-requests as container manager rpc" + " server is still starting."); this.setBlockNewContainerRequests(true); server.start(); InetSocketAddress connectAddress = NetUtils.getConnectAddress(server); NodeId nodeId = NodeId.newInstance( connectAddress.getAddress().getCanonicalHostName(), connectAddress.getPort()); ((NodeManager.NMContext)context).setNodeId(nodeId); this.context.getNMTokenSecretManager().setNodeId(nodeId); this.context.getContainerTokenSecretManager().setNodeId(nodeId); LOG.info("ContainerManager started at " + connectAddress); super.serviceStart(); }ContainerManager子服务之ResourceLocalizationService
@Override public void serviceStart() throws Exception { cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher), cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS); server = createServer(); server.start(); localizationServerAddress = getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS, server.getListenerAddress()); LOG.info("Localizer started on port " + server.getPort()); super.serviceStart(); }ContainerManager子服务之ContainersMonitor:
public void run() { while (true) { // Print the processTrees for debugging. ..... // 添加新的容器 synchronized (containersToBeAdded) { for (Entry<ContainerId, ProcessTreeInfo> entry : containersToBeAdded .entrySet()) { ContainerId containerId = entry.getKey(); ProcessTreeInfo processTreeInfo = entry.getValue(); LOG.info("Starting resource-monitoring for " + containerId); trackingContainers.put(containerId, processTreeInfo); } containersToBeAdded.clear(); } // 删除完成容器 synchronized (containersToBeRemoved) { for (ContainerId containerId : containersToBeRemoved) { trackingContainers.remove(containerId); LOG.info("Stopping resource-monitoring for " + containerId); } containersToBeRemoved.clear(); } // 检测所有已跟踪容器,如果资源使用超出限制则kill long vmemStillInUsage = 0; long pmemStillInUsage = 0; //遍历容器 for (Iterator<Map.Entry<ContainerId, ProcessTreeInfo>> it = trackingContainers.entrySet().iterator(); it.hasNext();) { Map.Entry<ContainerId, ProcessTreeInfo> entry = it.next(); ContainerId containerId = entry.getKey(); ProcessTreeInfo ptInfo = entry.getValue(); try { String pId = ptInfo.getPID(); // 如果PID为空则需要重新初始化 if (pId == null) { // get pid from ContainerId pId = containerExecutor.getProcessId(ptInfo.getContainerId()); if (pId != null) { // pId will be null, either if the container is not spawned yet // or if the container‘s pid is removed from ContainerExecutor LOG.debug("Tracking ProcessTree " + pId + " for the first time"); ResourceCalculatorProcessTree pt = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf); ptInfo.setPid(pId); ptInfo.setProcessTree(pt); } } // End of initializing any uninitialized processTrees //初始化后仍然为空的则放弃跟踪 if (pId == null) { continue; // processTree cannot be tracked } LOG.debug("Constructing ProcessTree for : PID = " + pId + " ContainerId = " + containerId); ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree(); pTree.updateProcessTree(); // update process-tree //计算当前内存使用:物理内存和虚拟内存 long currentVmemUsage = pTree.getCumulativeVmem(); long currentPmemUsage = pTree.getCumulativeRssmem(); // as processes begin with an age 1, we want to see if there // are processes more than 1 iteration old. long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1); long curRssMemUsageOfAgedProcesses = pTree.getCumulativeRssmem(1); //计算阈值 long vmemLimit = ptInfo.getVmemLimit(); long pmemLimit = ptInfo.getPmemLimit(); LOG.info(String.format( "Memory usage of ProcessTree %s for container-id %s: ", pId, containerId.toString()) + formatUsageString(currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit)); boolean isMemoryOverLimit = false; String msg = ""; //判断内存使用是否超出阈值 if (isVmemCheckEnabled() && isProcessTreeOverLimit(containerId.toString(), currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) { // Container (the root process) is still alive and overflowing // memory. // Dump the process-tree and then clean it up. msg = formatErrorMessage("virtual", currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit, pId, containerId, pTree); isMemoryOverLimit = true; } else if (isPmemCheckEnabled() && isProcessTreeOverLimit(containerId.toString(), currentPmemUsage, curRssMemUsageOfAgedProcesses, pmemLimit)) { // Container (the root process) is still alive and overflowing // memory. // Dump the process-tree and then clean it up. msg = formatErrorMessage("physical", currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit, pId, containerId, pTree); isMemoryOverLimit = true; } //如果超出阈值,则杀死该容器 if (isMemoryOverLimit) { // Virtual or physical memory over limit. Fail the container and // remove // the corresponding process tree LOG.warn(msg); // warn if not a leader if (!pTree.checkPidPgrpidForMatch()) { LOG.error("Killed container process with PID " + pId + " but it is not a process group leader."); } // kill the container eventDispatcher.getEventHandler().handle( new ContainerKillEvent(containerId, msg)); it.remove(); LOG.info("Removed ProcessTree with root " + pId); } else { // Accounting the total memory in usage for all containers that // are still // alive and within limits. vmemStillInUsage += currentVmemUsage; pmemStillInUsage += currentPmemUsage; } } catch (Exception e) { // Log the exception and proceed to the next container. LOG.warn("Uncaught exception in ContainerMemoryManager " + "while managing memory of " + containerId, e); } } //休息一下 try { Thread.sleep(monitoringInterval); } catch (InterruptedException e) { LOG.warn(ContainersMonitorImpl.class.getName() + " is interrupted. Exiting."); break; } } }NodeStatusUpdater服务:接收来自RM命令,并执行相应操作,类似MR1中TT的主循环
protected void startStatusUpdater() { statusUpdaterRunnable = new Runnable() { @Override @SuppressWarnings("unchecked") public void run() { int lastHeartBeatID = 0; while (!isStopped) { // Send heartbeat try { //执行心跳操作,并捕获返回结果 NodeHeartbeatResponse response = null; NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext(lastHeartBeatID); NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, NodeStatusUpdaterImpl.this.context .getContainerTokenSecretManager().getCurrentKey(), NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager() .getCurrentKey()); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); updateMasterKeys(response); //节点关闭操作 if (response.getNodeAction() == NodeAction.SHUTDOWN) { LOG .warn("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," + " hence shutting down."); LOG.warn("Message from ResourceManager: " + response.getDiagnosticsMessage()); dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); break; } //重新同步操作 if (response.getNodeAction() == NodeAction.RESYNC) { LOG.warn("Node is out of sync with ResourceManager," + " hence resyncing."); LOG.warn("Message from ResourceManager: " + response.getDiagnosticsMessage()); // Invalidate the RMIdentifier while resync NodeStatusUpdaterImpl.this.rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); break; } //清理相应容器 lastHeartBeatID = response.getResponseId(); List<ContainerId> containersToCleanup = response .getContainersToCleanup(); if (!containersToCleanup.isEmpty()) { dispatcher.getEventHandler().handle( new CMgrCompletedContainersEvent(containersToCleanup, CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER)); } List<ApplicationId> appsToCleanup = response.getApplicationsToCleanup(); //Only start tracking for keepAlive on FINISH_APP trackAppsForKeepAlive(appsToCleanup); if (!appsToCleanup.isEmpty()) { dispatcher.getEventHandler().handle( new CMgrCompletedAppsEvent(appsToCleanup, CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); throw new YarnRuntimeException(e); } catch (Throwable e) { // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); } finally { synchronized (heartbeatMonitor) { nextHeartBeatInterval = nextHeartBeatInterval <= 0 ? YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS : nextHeartBeatInterval; try { heartbeatMonitor.wait(nextHeartBeatInterval); } catch (InterruptedException e) { // Do Nothing } } } } }
Hadoop2.x NodeManager启动之服务启动,布布扣,bubuko.com
原文:http://blog.csdn.net/lihm0_1/article/details/21741561