直接进入 NameServer 主函数 org.apache.rocketmq.namesrv.NamesrvStartup#main
public static void main(String[] args) { main0(args); }
空壳方法,调用 NamesrvStartup#main0 方法启动 NameServer
进入 NamesrvStartup#main0 方法
public static NamesrvController main0(String[] args) { try { //创建 NamesrvController NamesrvController controller = createNamesrvController(args); //启动 NamesrvController start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
NamesrvStartup#main0 方法启动 NameServer 主要分为两步,1. 调用 NamesrvStartup#createNamesrvController 方法创建 NameServerController,2. 调用 NamesrvStartup#start 方法启动。
先进入 NamesrvStartup#createNamesrvController 方法,看看 NameServer 的创建流程
/** * 创建 NamesrvController * @param args * @return * @throws IOException * @throws JoranException */ public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson(); Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } //创建 NamesrvController 的两大核心配置 NamesrvConfig 和 NettyServerConfig final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876); // -c 配置文件路径 -- 通过配置文件加载配置 if (commandLine.hasOption(‘c‘)) { String file = commandLine.getOptionValue(‘c‘); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } // -p 属性名=属性值 -- 直接指定属性值 if (commandLine.hasOption(‘p‘)) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } //通过控制台指定属性值 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); //创建 NamesrvController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); //controller 添加配置 // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); return controller; }
NamesrvStartup#createNamesrvController 方法创建 NameServerController 主要有以下几步
NameSrvConfig 属性
/** * rocketmq 主目录 */ private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); /** * NameServer 存储KV配置属性的持久化路径 */ private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; /** * NameServer 默认配置文件路径 */ private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center"; private boolean clusterTest = false; /** * 是否支持顺序消息 */ private boolean orderMessageEnable = false;
NettyServerConfig 属性
//NameServer监听端口,该值默认会被初始化为9876 private int listenPort = 8888; //Netty 业务线程池线程个数 private int serverWorkerThreads = 8; //Netty public 任务线程池线程个数,Netty网格设计,根据业务类型会创建不同的线程池, //比如处理消息发送、消息消费、心跳检测等。如果该业务类型未注册线程池,则由public线程池执行 private int serverCallbackExecutorThreads = 0; //IO 线程池个数,主要是NameServer、Broker端解析请求,返回相应的线程个数,这类线程主要是处理网路请求的, //解析请求包,然后转发到各个业务线程池完成具体的操作,然后将结果返回给调用方 private int serverSelectorThreads = 3; //send oneway 消息请求并发数(Broker端参数) private int serverOnewaySemaphoreValue = 256; //异步消息发送最大并发度 private int serverAsyncSemaphoreValue = 64; //异步消息发送最大的空闲时间,默认120s private int serverChannelMaxIdleTimeSeconds = 120; //网络socket发送缓冲区大小 private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; //网络接收端缓存区大小 private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; //ByteBuffer是否开启缓存 private boolean serverPooledByteBufAllocatorEnable = true;
进入 NamesrvStartup#start 方法,查看 NamesrvController 启动流程
/** * 启动 NamesrvController * @param controller * @return * @throws Exception */ public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } //controller 初始化 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } //添加关闭的钩子函数 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); //启动 controller controller.start(); return controller; }
NamesrvStartup#start 方法会先调用 NamesrvController#initialize 方法初始化 controller,然后添加一个服务关闭的钩子函数,最后调用真正的启动方法 NamesrvController#start 并返回 controller
进入 NamesrvController#initialize 方法
public boolean initialize() { //加载本地 KV 配置 this.kvConfigManager.load(); //创建 NettyServer 服务端对象 remotingServer this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //创建处理响应的线程池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //注册 remotingServer 的请求处理器 this.registerProcessor(); //启动检测 broker 活跃状态的定时任务, 启动五秒后每隔十秒扫描未活跃的 broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //打印本地 kv 配置的定时任务 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); //SSL证书相关 if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can‘t load the certificate dynamically"); } } return true; }
NamesrvController#initialize 方法初始化 controller 主要有以下任务
进入 NamesrvController#registerProcessor 方法
private void registerProcessor() { if (namesrvConfig.isClusterTest()) { this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor); } else { this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } }
NamesrvController#registerProcessor 方法就是给 remotingServer 添加了一个 DefaultRequestProcessor 实例的处理器并指定 remotingExecutor 线程池来处理请求
进入 controller 启动方法 NamesrvController#start, 查看 controller 启动流程
public void start() throws Exception { this.remotingServer.start(); if (this.fileWatchService != null) { this.fileWatchService.start(); } }
NamesrvController#start 方法就是启动了 remotingServer (也就是 NettyRemotingServer)和 fileWatchServer (SSL证书相关服务)
以上, RocketMQ 的 NameServer 启动完成,整个启动流程如图
RocketMQ 源码解析(1) -- NameServer 启动流程
原文:https://www.cnblogs.com/programmlover/p/12871583.html