Dubbo默认的底层网络通讯使用的是Netty,服务提供方NettyServer使用两级线程池,其中 EventLoopGroup(boss) 主要用来接受客户端的链接请求,并把接受的请求分发给 EventLoopGroup(worker) 来处理,boss和worker线程组我们称之为IO线程。
如果服务提供方的逻辑能迅速完成,并且不会发起新的IO请求,那么直接在IO线程上处理会更快,因为这减少了线程池调度。
但如果处理逻辑很慢,或者需要发起新的IO请求,比如需要查询数据库,则IO线程必须派发请求到新的线程池进行处理,否则IO线程会阻塞,将导致不能接收其它请求。
Dubbo提供的线程模型
根据请求的消息类被IO线程处理还是被业务线程池处理,Dubbo提供了下面几种线程模型:
其中AllDispatcher对应的handler代码如下:
public class AllChannelHandler extends WrappedChannelHandler{ public AllChannelHandler(ChannelHandler handler , URL url){ super(handler,url); } // 链接事件,交给业务线程池处理 public void connected(Channel channel) throws RemotingExcecption{ ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.CONNECTED)); }catch(Throwable t){ throw new ExecutionException("connect event" , channel , getClass() + "error when process connected event.",t); } } // 链接断开事件,交给业务线程池处理 public void disconnected(Channel channel) throws RemotingException{ ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.DISCONNECTED)); }catch(Throwable t){ throw new ExecutionException("disconnect event",channel,getClass()+" error when process disconnected event.",t); } } // 请求响应事件,交给业务线程池处理 public void received(Channel channel , Object message) throws RemotingException{ ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.RECEIVED,message)); }catch(Throwable t){ // TODO 临时解决线程池满后异常信息无法发送到对端的问题。待重构 // fix 线程池满了拒绝调用不返回,导致消费者一直等待超时 if(message instanceof Request && t instanceof RejectedExecutionException ){ ... } throw new ExecutionException(message , channel ,getClass() + " error when process received event.",t); } } // 异常处理事件,交给业务线程池处理 public void caught(Channel channel , Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.CAUGHT,exception)); }catch(Throwable t){ throw new ExecutionException("caught event",channel,getClass() + " error when process caught event ."); } } ... }
可知所有事件都直接交给业务线程池进行处理了。
Dubbo提供了常用的线程池模型,这些模型可以满足我们绝大多数的需求,但是您可以根据自己的需要进行扩展定制。在服务提供者启动线程时,我们会看到什么时候加载的线程模型的实现。
Dubbo提供的线程池策略
扩展接口 ThreadPool 的SPI实现有如下几种:
其中fixed策略对应扩展实现类是FixedThreadPool,代码如下:
public class FixedThreadPool implements ThreadPool{ public Executor getExecutor(URL url){ String name = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME); int threads = url.getParameter(Constants.THREADS_KEY,Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY,Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(threads , threads , 0 , TimeUnit.MILLISECONDS , queues==0 ? new SynchronousQueue<Runnable>() : (queue < 0 ? new LinkedBlockingQueue<Runnable>(queues)) , new NamedThreadFactory(name,true) , new AbortPolicyWithReport(name,url)); } }
可知使用ThreadPoolExecutor创建了核心线程数=最大线程池数=threads的线程池。
Dubbo线程池扩展,这些扩展可以满足我们绝大多数的需求,但是您可以根据自己的需要进行扩展定制。在服务提供者启动流程时,我们会看到什么时候加载的线程池扩展实现。
原文:https://www.cnblogs.com/cnndevelop/p/12186967.html