首先我们通过一个时序图,直观看下Dubbo服务提供方启动的流程:
public synchronized void export(){ ... // 这里是延迟发布 if(delay != null && delay > 0){ delayExportExecutor.schedule(new Runnable(){ public void run(){ doExport(); } } , delay , TimeUnit.MILLISECONDS); }else{ // 直接发布 doExport(); } }
private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter" , true));
如上代码,可知Dubbo的延迟发布通过使用ScheduledExecutorService来实现的,可以通过调用ServiceConfig的setDelay(Integer delay)来设置延迟发布时间。如果没有设置延迟时间则直接调用代码(2)doExport()方法发布服务,延迟发布最后也是调用的该方法。
Invoker<?> invoker = proxyFactory.getInvoker(ref , (Class)interfaceClass , url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker , this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter);
其中proxyFactory和protocol的定义如下:
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdativeExtension(); private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
可知它们是扩展接口的适配器类。
JavassistProxyFactory的getInvoker代码如下:
public <T> Invoker<T> getInvoker(T proxy , Class<T> type , URL url){ // final Wrapper c = Wrapper.getWrapper(proxy.getClass().getName().indexOf(‘$‘) < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy , type , url){ @Override protected Object doInvoke(T proxy , String methodName , Class<?>[] parameterTypes , Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy,methodName,parameterTypes,arguments); } }; }
这里首先把服务实现类转换为Wrapper类,是为了减少反射的调用。代码(6)具体返回的是AbstractProxyInvoker对象,其内部重写doInvoke方法,具体委托给Wrapper实现具体功能。到这里完成了《Dubbo具体架构分析》一文中讲解的服务提供方实现类到Invoker的转换。
下面我们再来看doLocalExport是如何启动NettyServer的,doLocalExport内部主要调用DubboProtocol的export方法,下面看下时序图:
由于DubboProtocol也被Wrapper类增强了,所以也是一层层调用后,执行代码(7)调用DubboProtocol的export方法,export代码如下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException{ URL url = invoker.getUrl(); // invoker到exporter转换 String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker , key , exporterMap); exporterMap.put(key,exporter); ... openServer(url); return exporter; }
这里将invoker转换到DubboExporter对象,然后执行代码(9),接着一步步最后会执行到代码(14)NettyTransporter的bind方法,其代码如下:
public Server bind(URL url,ChannelHandler listener) throws RemotingException{ return new NettyServer(url,listener); }
有上面代码可知,其内部创建了NettyServer,而NettyServer的构造函数内部又调用了NettyServer的doOpen方法启动了服务监听。
public NettyServer(URL url,ChannelHandler handler) throws RemotingException{ super(url,ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url,SERVER_THREAD_POOL_NAME))); }
这里我们主要看 ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url,SERVER_THREAD_POOL_NAME))这个代码,改代码里面加载了具体的线程模型,通过ChannelHandlers的wrapInternal方法完成了加载:
protected ChannelHandler wrapInternal(ChannelHandler handler,URL url){ return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler,url))); }
可知根据url里面的线程模型选择具体的Dispatcher实现类,这里再重新提下Dubbo提供的Dispatcher实现类如下,默认是all:
all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
到这里线程模型加载地方讲解完了,还有一个点就是线程模型中线程池SPI扩展什么时候加载的呢?这里以线程模型AllDispatcher为例,它对应的是AllChannelHandler,其构造函数如下:
public AllChannelHandler(ChannelHandler handler,URL url){ super(handler , url); // 父类WrappedChannelHandler构造方法 }
public WrappedChannelHandler(ChannelHandler handler,URL url){ ... executor = (ExecutorService)ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); ... }
可知WrapperChannelHandler的构造函数里根据url获取了具体的扩展接口ThreadPool的SPI实现类,这里再提下ThreadPool的扩展接口如下,默认是fixed:
fixed=com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool cached=com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool limited=com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool
原文:https://www.cnblogs.com/xhj123/p/9118930.html