这部分,我们从脚本作为入口去逐步深入ResourceManager源码。
从 Hadoop 官方文档 中可以看到 ResourceManager 的启动命令为:
Usage: yarn resourcemanager [-format-state-store]
COMMAND_OPTIONS | Description |
---|---|
-format-state-store | Formats the RMStateStore. This will clear the RMStateStore and is useful if past applications are no longer needed. This should be run only when the ResourceManager is not running. |
-remove-application-from-state-store <appId> | Remove the application from RMStateStore. This should be run only when the ResourceManager is not running. |
定位到 源代码 hadoop-yarn-project > hadoop-yarn > bin > start-yarn.sh
# start resourceManager HARM=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.enabled 2>&-) # 查看配置,是否启用 ResourceManager 的 HA 机制
# 未启用 ResourceManager 的 HA 机制 if [[ ${HARM} = "false" ]]; then echo "Starting resourcemanager" hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" --config "${HADOOP_CONF_DIR}" --daemon start resourcemanager (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? )) else # 启用ResourceManager的 HA 机制 logicals=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey yarn.resourcemanager.ha.rm-ids 2>&-) # yarn.resoucemanager.ha.rm-ids 表示 RM 的逻辑Ids,多个按逗号分割 logicals=${logicals//,/ } # 按逗号分割成多个 RM id for id in ${logicals} do rmhost=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -confKey "yarn.resourcemanager.hostname.${id}" 2>&-) RMHOSTS="${RMHOSTS} ${rmhost}" # 最终,RMHOSTS 变量会是由空格分割的 hostname 字符串 done echo "Starting resourcemanagers on [${RMHOSTS}]" hadoop_uservar_su yarn resourcemanager "${HADOOP_YARN_HOME}/bin/yarn" \ # 运行 yarn 命令 --config "${HADOOP_CONF_DIR}" --daemon start --workers --hostnames "${RMHOSTS}" resourcemanager (( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? )) # 累加上一个命令的返回值 fi
首先解释 shell 分割字符串的语法:
$ aa=‘1,2,3‘;for i in ${aa//,/ }; do echo $i; done; 1 2 3
参照 官方的配置sample 会比较容易理解,下面已经启用了HA,并且 RM ids 有 rm1,rm2, 其中rm1 的hostname 是 master1, rm2 的 hostname 是 master2,:
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster1</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>master1</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>master2</value>
</property>
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>zk1:2181,zk2:2181,zk3:2181</value>
</property>
然后再结合 yarn 脚本,可以得出,resourcemanager 的 入口类是 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager,参数为 --config "${HADOOP_CONF_DIR}" --daemon start --workers --hostnames "${RMHOSTS}" 以及经由 shell函数 传递的参数值(不做具体分析)
接下来,终于到了入口类 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager, 该类在 hadoop-yarn-server-resourcemanager 的子 mudule 下。
先来看 RM 对象的 声明, 继承了 CompositeService 服务类,说明 RM 是一个组件服务,实现了ResourceManagerMXBean接口,可以交给 JMX 管理:
public class ResourceManager extends CompositeService implements Recoverable, ResourceManagerMXBean
然后,找到 Main 函数:
public static void main(String argv[]) { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); try { Configuration conf = new YarnConfiguration(); GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); # 解析参数 argv = hParser.getRemainingArgs(); # --参数名 参数值之外的剩余以"-"开头的参数,第一次,没有指定剩余参数 // If -format-state-store, then delete RMStateStore; else startup normally if (argv.length >= 1) { if (argv[0].equals("-format-state-store")) { deleteRMStateStore(conf); } else if (argv[0].equals("-remove-application-from-state-store") && argv.length == 2) { removeApplication(conf, argv[1]); } else { printUsage(System.err); } } else { ResourceManager resourceManager = new ResourceManager();
// 初始化RM对象实例,在超类中初始化服务名称为 “ResouceManager” ,并实例化了状态模型成员字段 stateModel,初始化状态为 Service.State.NOTINITED ,后面详细介绍 ShutdownHookManager.get().addShutdownHook( // 添加服务组件关闭的回调函数 new CompositeServiceShutdownHook(resourceManager), SHUTDOWN_HOOK_PRIORITY); resourceManager.init(conf); // 初始化 RM 服务 resourceManager.start(); // 启动 RM 服务 } } catch (Throwable t) { LOG.fatal("Error starting ResourceManager", t); System.exit(-1); } }
@Override // 定义在其父类 AbstractService 中 public void init(Configuration conf) { if (conf == null) { throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); } if (isInState(STATE.INITED)) { return; } synchronized (stateChangeLock) { if (enterState(STATE.INITED) != STATE.INITED) { // 服务没有没有被初始化过 setConfig(conf); // 设值 conf 对象 try { serviceInit(config); // 初始化服务 if (isInState(STATE.INITED)) { // 如果服务正确初始化 //if the service ended up here during init, //notify the listeners notifyListeners(); // 通知 listener } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } }
serviceInit 方法在 ResouceManager 类中有实现:
@Override protected void serviceInit(Configuration conf) throws Exception { this.conf = conf; // 1. 初始化服务上下文 // RMContextImpl 保存了两类服务的上下文 // 一类是 serviceContext : 这类服务是 Always On 服务,即不考虑HA状态的一直运行的服务 // 一类是 activeServiceCotext : 活动的服务上下文,即需要运行在Active RM 节点上的服务 this.rmContext = new RMContextImpl(); rmContext.setResourceManager(this); // 2. 设置配置的provider this.configurationProvider = ConfigurationProviderFactory.getConfigurationProvider(conf); this.configurationProvider.init(this.conf); rmContext.setConfigurationProvider(configurationProvider); // 3.加载 core-site.xml loadConfigurationXml(YarnConfiguration.CORE_SITE_CONFIGURATION_FILE); // Do refreshSuperUserGroupsConfiguration with loaded core-site.xml // Or use RM specific configurations to overwrite the common ones first // if they exist RMServerUtils.processRMProxyUsersConf(conf); ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf); // 4. 加载 yarn-site.xml loadConfigurationXml(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); // 5. 配置校验 validateConfigs(this.conf); // 6. login // Set HA configuration should be done before login this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); if (this.rmContext.isHAEnabled()) { // 如果RM 启用了 HA,设置 HA 的配置 HAUtil.verifyAndSetConfiguration(this.conf); } // Set UGI and do login // If security is enabled, use login user // If security is not enabled, use current user // 如果是启用了 安全认证,比如 kerberos,使用kerberos 登陆用户,否则默认使用当前用户 this.rmLoginUGI = UserGroupInformation.getCurrentUser(); try { doSecureLogin(); } catch(IOException ie) { throw new YarnRuntimeException("Failed to login", ie); } // register the handlers for all AlwaysOn services using setupDispatcher(). // 7. 初始化所有的一直运行的服务的事件的handler rmDispatcher = setupDispatcher(); addIfService(rmDispatcher); rmContext.setDispatcher(rmDispatcher); // The order of services below should not be changed as services will be // started in same order // As elector service needs admin service to be initialized and started, // first we add admin service then elector service // 8. 创建 AdminService adminService = createAdminService(); addService(adminService); rmContext.setRMAdminService(adminService); // elector must be added post adminservice if (this.rmContext.isHAEnabled()) { // If the RM is configured to use an embedded leader elector, // initialize the leader elector. if (HAUtil.isAutomaticFailoverEnabled(conf) && HAUtil.isAutomaticFailoverEmbedded(conf)) { EmbeddedElector elector = createEmbeddedElector(); addIfService(elector); rmContext.setLeaderElectorService(elector); } } // 9. 设置 Yarn Configuration rmContext.setYarnConfiguration(conf); // 10. 创建并初始化 Active Service createAndInitActiveServices(false); // 11. 获取 yarn wenApp地址 webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, YarnConfiguration.RM_BIND_HOST, WebAppUtils.getRMWebAppURLWithoutScheme(this.conf)); // 12. 创建 RMApplicationHistoryWriter 服务 RMApplicationHistoryWriter rmApplicationHistoryWriter = createRMApplicationHistoryWriter(); addService(rmApplicationHistoryWriter); rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); // initialize the RM timeline collector first so that the system metrics // publisher can bind to it // 13. 创建 RM timeline collector if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) { RMTimelineCollectorManager timelineCollectorManager = createRMTimelineCollectorManager(); addService(timelineCollectorManager); rmContext.setRMTimelineCollectorManager(timelineCollectorManager); } // 14. 设置 SystemMetricsPublisher SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher(); addIfService(systemMetricsPublisher); rmContext.setSystemMetricsPublisher(systemMetricsPublisher); // 15. 注册 JMX registerMXBean(); // 16. 调用父类的服务 init 方法 super.serviceInit(this.conf); }
下面逐一查看初始化的各个子步骤
public RMContextImpl() { // 一直运行的服务上下文 this.serviceContext = new RMServiceContext(); // 只运行在 active RM 节点上的 上下文 this.activeServiceContext = new RMActiveServiceContext(); }
这里使用了工厂模式和配置提供了默认的ConfigurationProvider ,并且用户可以实现 ConfigurationProvider 自定义 provider。
provider 其实在其他的源码中也经常用到。在这里,provider 提供了可以做一些内部的初始化以及返回 配置文件的 inputstream 流对象,关闭流对象等操作。对于处理解析配置的类来说,只需要一个输入流即可。
// ConfigurationProviderFactory 是一个工厂类 /** * Creates an instance of {@link ConfigurationProvider} using given * configuration. * @param bootstrapConf * @return configurationProvider */ @SuppressWarnings("unchecked") public static ConfigurationProvider getConfigurationProvider(Configuration bootstrapConf) { Class<? extends ConfigurationProvider> defaultProviderClass; try { // 默认的 provider class 是org.apache.hadoop.yarn.LocalConfigurationProvider defaultProviderClass = (Class<? extends ConfigurationProvider>) Class.forName( YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS); } catch (Exception e) { throw new YarnRuntimeException( "Invalid default configuration provider class" + YarnConfiguration.DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS, e); } ConfigurationProvider configurationProvider = // 从缓存池中获取到该类的 构造方法,然后根据构造方法反射得到 provider实例 // 可以 通过 yarn.resourcemanager.configuration.provider-class 参数指定 provider ReflectionUtils.newInstance(bootstrapConf.getClass( YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, defaultProviderClass, ConfigurationProvider.class), bootstrapConf); return configurationProvider; }
private void loadConfigurationXml(String configurationFile) throws YarnException, IOException { InputStream configurationInputStream = this.configurationProvider.getConfigurationInputStream(this.conf, configurationFile); if (configurationInputStream != null) { this.conf.addResource(configurationInputStream, configurationFile); } }
跟 3.3.3 操作类似
主要校验 最大尝试次数 和 过期会话时长 和 心跳间隔的关系
protected static void validateConfigs(Configuration conf) { // validate max-attempts int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); if (globalMaxAppAttempts <= 0) { throw new YarnRuntimeException("Invalid global max attempts configuration" + ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS + "=" + globalMaxAppAttempts + ", it should be a positive integer."); } // validate expireIntvl >= heartbeatIntvl long expireIntvl = conf.getLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); long heartbeatIntvl = conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); if (expireIntvl < heartbeatIntvl) { throw new YarnRuntimeException("Nodemanager expiry interval should be no" + " less than heartbeat interval, " + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS + "=" + expireIntvl + ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "=" + heartbeatIntvl); } }
第一步:校验是否启用了HA, 如果启用了HA,需要配置HA 的相关信息,因为 用户登陆,是每个节点都需要登陆的。
第二步:获取当前的用户, 如果启用了 kerberos,那么是当前登陆kerberos的用户,否则是当前用户
@InterfaceAudience.Public @InterfaceStability.Evolving public static UserGroupInformation getCurrentUser() throws IOException { AccessControlContext context = AccessController.getContext(); Subject subject = Subject.getSubject(context); if (subject == null || subject.getPrincipals(User.class).isEmpty()) { return getLoginUser(); } else { return new UserGroupInformation(subject); } }
第三步: 调用安全API登陆,并获取登陆用户
protected void doSecureLogin() throws IOException { InetSocketAddress socAddr = getBindAddress(conf); SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB, YarnConfiguration.RM_PRINCIPAL, socAddr.getHostName()); // if security is enable, set rmLoginUGI as UGI of loginUser if (UserGroupInformation.isSecurityEnabled()) { this.rmLoginUGI = UserGroupInformation.getLoginUser(); } }
private Dispatcher setupDispatcher() { // 创建 dispatcher Dispatcher dispatcher = createDispatcher(); // 将 RMFatalEventType 事件的handler RMFatalEventDispatcher // 注册到 dispatcher dispatcher.register(RMFatalEventType.class, new ResourceManager.RMFatalEventDispatcher()); return dispatcher; } protected Dispatcher createDispatcher() { return new AsyncDispatcher("RM Event dispatcher"); }
AsyncDispatcher 内部是 有一个 阻塞的 事件队列,有一个一直运行的 执行线程,当阻塞队列中有事件被放入,执行线程会把事件取出来,并获取事件的类型,从事件注册器Map<Class<? extends Enum>, EventHandler>中 获取到对应的 EventHandler 对象,并调用 该对象的 dispatch 方法。这样就完成了一次异步事件调用。
protected AdminService createAdminService() { return new AdminService(this); }
rmContext.setYarnConfiguration(conf); // 调用了 public void setYarnConfiguration(Configuration yarnConfiguration) { serviceContext.setYarnConfiguration(yarnConfiguration); }
protected void createAndInitActiveServices(boolean fromActive) { activeServices = new RMActiveServices(this); activeServices.fromActive = fromActive; activeServices.init(conf); } // 其中,init 方法如下 @Override public void init(Configuration conf) { if (conf == null) { throw new ServiceStateException("Cannot initialize service " + getName() + ": null configuration"); } if (isInState(STATE.INITED)) { return; } synchronized (stateChangeLock) { if (enterState(STATE.INITED) != STATE.INITED) { setConfig(conf); try { serviceInit(config); if (isInState(STATE.INITED)) { //if the service ended up here during init, //notify the listeners notifyListeners(); } } catch (Exception e) { noteFailure(e); ServiceOperations.stopQuietly(LOG, this); throw ServiceStateException.convert(e); } } } } // 调用的 serviceInit 方法如下,后面具体分析 @Override protected void serviceInit(Configuration configuration) throws Exception { standByTransitionRunnable = new StandByTransitionRunnable(); rmSecretManagerService = createRMSecretManagerService(); addService(rmSecretManagerService); containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); addService(containerAllocationExpirer); rmContext.setContainerAllocationExpirer(containerAllocationExpirer); AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); addService(amLivelinessMonitor); rmContext.setAMLivelinessMonitor(amLivelinessMonitor); AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); addService(amFinishingMonitor); rmContext.setAMFinishingMonitor(amFinishingMonitor); RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor(); addService(rmAppLifetimeMonitor); rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor); RMNodeLabelsManager nlm = createNodeLabelManager(); nlm.setRMContext(rmContext); addService(nlm); rmContext.setNodeLabelManager(nlm); AllocationTagsManager allocationTagsManager = createAllocationTagsManager(); rmContext.setAllocationTagsManager(allocationTagsManager); PlacementConstraintManagerService placementConstraintManager = createPlacementConstraintManager(); addService(placementConstraintManager); rmContext.setPlacementConstraintManager(placementConstraintManager); // add resource profiles here because it‘s used by AbstractYarnScheduler ResourceProfilesManager resourceProfilesManager = createResourceProfileManager(); resourceProfilesManager.init(conf); rmContext.setResourceProfilesManager(resourceProfilesManager); RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = createRMDelegatedNodeLabelsUpdater(); if (delegatedNodeLabelsUpdater != null) { addService(delegatedNodeLabelsUpdater); rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater); } recoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); RMStateStore rmStore = null; if (recoveryEnabled) { rmStore = RMStateStoreFactory.getStore(conf); boolean isWorkPreservingRecoveryEnabled = conf.getBoolean( YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); rmContext .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled); } else { rmStore = new NullRMStateStore(); } try { rmStore.setResourceManager(rm); rmStore.init(conf); rmStore.setRMDispatcher(rmDispatcher); } catch (Exception e) { // the Exception from stateStore.init() needs to be handled for // HA and we need to give up master status if we got fenced LOG.error("Failed to init state store", e); throw e; } rmContext.setStateStore(rmStore); if (UserGroupInformation.isSecurityEnabled()) { delegationTokenRenewer = createDelegationTokenRenewer(); rmContext.setDelegationTokenRenewer(delegationTokenRenewer); } // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); addService(nodesListManager); rmContext.setNodesListManager(nodesListManager); // Initialize the scheduler scheduler = createScheduler(); scheduler.setRMContext(rmContext); addIfService(scheduler); rmContext.setScheduler(scheduler); schedulerDispatcher = createSchedulerEventDispatcher(); addIfService(schedulerDispatcher); rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); // Register event handler for RmAppEvents rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext)); // Register event handler for RmAppAttemptEvents rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(rmContext)); // Register event handler for RmNodes rmDispatcher.register( RMNodeEventType.class, new NodeEventDispatcher(rmContext)); nmLivelinessMonitor = createNMLivelinessMonitor(); addService(nmLivelinessMonitor); resourceTracker = createResourceTrackerService(); addService(resourceTracker); rmContext.setResourceTrackerService(resourceTracker); MetricsSystem ms = DefaultMetricsSystem.initialize("ResourceManager"); if (fromActive) { JvmMetrics.reattach(ms, jvmMetrics); UserGroupInformation.reattachMetrics(); } else { jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null); } JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(); addService(pauseMonitor); jvmMetrics.setPauseMonitor(pauseMonitor); // Initialize the Reservation system if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) { reservationSystem = createReservationSystem(); if (reservationSystem != null) { reservationSystem.setRMContext(rmContext); addIfService(reservationSystem); rmContext.setReservationSystem(reservationSystem); LOG.info("Initialized Reservation system"); } } masterService = createApplicationMasterService(); createAndRegisterOpportunisticDispatcher(masterService); addService(masterService) ; rmContext.setApplicationMasterService(masterService); applicationACLsManager = new ApplicationACLsManager(conf); queueACLsManager = createQueueACLsManager(scheduler, conf); rmAppManager = createRMAppManager(); // Register event handler for RMAppManagerEvents rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); clientRM = createClientRMService(); addService(clientRM); rmContext.setClientRMService(clientRM); applicationMasterLauncher = createAMLauncher(); rmDispatcher.register(AMLauncherEventType.class, applicationMasterLauncher); addService(applicationMasterLauncher); if (UserGroupInformation.isSecurityEnabled()) { addService(delegationTokenRenewer); delegationTokenRenewer.setRMContext(rmContext); } if(HAUtil.isFederationEnabled(conf)) { String cId = YarnConfiguration.getClusterId(conf); if (cId.isEmpty()) { String errMsg = "Cannot initialize RM as Federation is enabled" + " but cluster id is not configured."; LOG.error(errMsg); throw new YarnRuntimeException(errMsg); } federationStateStoreService = createFederationStateStoreService(); addIfService(federationStateStoreService); LOG.info("Initialized Federation membership."); } new RMNMInfo(rmContext, scheduler); if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE, false)) { SystemServiceManager systemServiceManager = createServiceManager(); addIfService(systemServiceManager); } super.serviceInit(conf); }
// yarn.resourcemanager.bind-host 可以根据这个参数来动态指定 RM HOST webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, YarnConfiguration.RM_BIND_HOST, WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { return new RMApplicationHistoryWriter(); } RMApplicationHistoryWriter rmApplicationHistoryWriter = createRMApplicationHistoryWriter(); addService(rmApplicationHistoryWriter); rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
private RMTimelineCollectorManager createRMTimelineCollectorManager() { return new RMTimelineCollectorManager(this); } if (YarnConfiguration.timelineServiceV2Enabled(this.conf)) { RMTimelineCollectorManager timelineCollectorManager = createRMTimelineCollectorManager(); addService(timelineCollectorManager); rmContext.setRMTimelineCollectorManager(timelineCollectorManager); }
protected SystemMetricsPublisher createSystemMetricsPublisher() { List<SystemMetricsPublisher> publishers = new ArrayList<SystemMetricsPublisher>(); // 使用 v1 if (YarnConfiguration.timelineServiceV1Enabled(conf)) { SystemMetricsPublisher publisherV1 = new TimelineServiceV1Publisher(); publishers.add(publisherV1); } // 使用 v2 if (YarnConfiguration.timelineServiceV2Enabled(conf)) { // we‘re dealing with the v.2.x publisher LOG.info("system metrics publisher with the timeline service V2 is " + "configured"); SystemMetricsPublisher publisherV2 = new TimelineServiceV2Publisher( rmContext.getRMTimelineCollectorManager()); publishers.add(publisherV2); } // 如果没有 publisher, 给一个 空的 publisher,这里运用了null object 模式,防止了空指针的出现。 if (publishers.isEmpty()) { LOG.info("TimelineServicePublisher is not configured"); SystemMetricsPublisher noopPublisher = new NoOpSystemMetricPublisher(); publishers.add(noopPublisher); } for (SystemMetricsPublisher publisher : publishers) { addIfService(publisher); } SystemMetricsPublisher combinedPublisher = new CombinedSystemMetricsPublisher(publishers); return combinedPublisher; }
/** * Register ResourceManagerMXBean. */ private void registerMXBean() { MBeans.register("ResourceManager", "ResourceManager", this); }
// 在这里,之前初始化过程中创建的任何被加入到服务列表中的服务,都会被初始化。 protected void serviceInit(Configuration conf) throws Exception { List<Service> services = getServices(); if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": initing services, size=" + services.size()); } for (Service service : services) { service.init(conf); } super.serviceInit(conf); } // 奇怪,为什么不直接 返回呢?ArrayList 的构造方法里面做的事就是 Arrays.copyOf 的工作(浅拷贝),防止了外部应用更新或删除服务列表。这是一个建议的做法,还可以返回一个 iterator 对象 public List<Service> getServices() { synchronized (serviceList) { return new ArrayList<Service>(serviceList); } }
至此,初始化的大致代码,基本上走完了,后续涉及到哪部分代码,再回来具体看。
YARN分析系列之三 -- 从脚本入口分析 ResourceManager的初始化过程
原文:https://www.cnblogs.com/johnny666888/p/11055651.html