之前写过一篇dubbo cluster–架构。因为dubbo逻辑集群的功能主要是在client端,主要侧重在client的分析。后来因为工作忙和懒癌,也就没再继续server的叙述了。最近正好在看大众点评的cat源码,其中也有rpc的模块,就借此专门来分析下rpc server的实现。
Cat server基于netty,是典型的reactor模型。
上图是网上找的reactor模型示例图。Netty的boss和workder分别映射图中的mainReactor和subReactor。由boss accept nio channel,之后交由worker read, decode以及handle等。Netty的实现和图中略微有点不一致,在netty中decode和handle是同步的。
需要说明的是netty handler也可以异步处理,netty支持线程池分发handler thread。
还有另一种方案,由应用程序自身实现延迟队列做异步处理,handler只需将消息(事件)放入队列即可,cat采用的就是这种方案。Cat是通过decoder解码消息后调用handler将消息插入延迟队列,并没有向netty注册handler再由netty在decode完毕后调用相应handler。
Cat的传输数据对象为MessageTree,MessageCodec对传输消息编码和解码,MessageHanlder将消息放入队列。下图是cat server的静态结构。
CatHomeModule就是cat server,它包含两个逻辑模块,一个是reactor,另一个是延时队列(period),分别对应上图的左右半边。
MessageConsumer和TcpSocketReceiver均被CatHomeModule依赖,其实是在receiver初始化工程中也相应初始化了这两个重要组件。同时MessageConsumer也是MessageHandler聚合属性,而handler则是receiver的一个内部属性。结构上看起来有点混乱,但其实module是启动了receiver的初始化,然后receiver在初始化过程中依赖了handler,而handler又依赖了consumer。而Module和consumer之间的依赖关系是一种很弱的引用,只是为了注册虚拟机的shutdownhook。
MessageTree是经由网络传递的消息报文对象,由MessageCodec进行解码和编码。在服务端被解码生成后作为方法参数被MessageConsumer消费,最终放入MessageQueue等待MessageAnalyzer处理。
MessageQueue被聚合在PeriodTask内,后者是个daemon线程,不断轮询MessageQueue,当有队列里有消息时就调用MessageAnalyzer处理,每个task只对应一个analyzer,analyzer就是队列的消费者。
一个Period代表一个周期,每个周期对应一个持续时间(duration),默认是一小时,且周期是整点时间段,例如1:00-2:00,2:00-3:00,而不是1:01-2:01。每个周期对应多个task,每条消息相应的也会被拷贝成多分分发给每个task。
周期由PeriodManager参考周期策略(PeriodStrategy)的结果生成或结束。
介绍完cat server的实体概念后,再从动态层面看下它的初始化过程以及消息的就绪和消费过程。
在静态结构视图里提到了CatHomeModule依赖两个实体,分别为MessageCosumer和TcpSocketReceiver,它们分别承担了reaactor以及延迟队列的功能,由CatHomeModule的initialize和setup阶段被初始化。
首先看下period的初始化过程。
上节提到策略结果会开始一个新的周期或者结束一个老的周期。上图的步骤5,策略会基于持续时间,提前开始时间(假设为a)以及延迟结束时间(假设为b)生成一个结果(a和b在cat中均默认为3分钟)。策略结果如果为正则start新周期,为负则end老周期,为0则不做任何动作。Cat默认超过duration*2+b的周期会被清理,相应的新周期也会提前a开始。
再看周期启动周期任务的过程(8-12),周期先回从应用上下文中获取所有的MessageAnalyzer(类似spring的getBeansOfType),再循环每个analyzer为每个analyzer生成一个或多个周期任务(视analyzer#getAnalyzerCount值而定)。
再看下reactor模块的初始化过程。
TcpSocketReciver在Server启动过程中被从上下文中lookup出来,并且执行初始化过程,在初始化的过程中通过依赖注入将MessageConsumer注入了MessageHandler中。相应的也将MessageHandler和MessageCodec注入给自己作为聚合属性。
TcpSocketReciver是个netty server,它监听socket请求,由关联的MessageDecoder解析生成MessageTree,再交由MessageHandler处理。MessageHandler将MessageTree交由MessageConsumer(延迟队列)消费,consumer基于MessageTree的时间戳找到相应的Period,将其放入相应的PeriodTask中。Cat延时队列不支持主题,所以每条消息会默认被所有task消费。
注意下步骤10,在上节提过有的analyzer会对应多个task,select的过程就是对这种重复task通过MessageTree#domain进行hash取模选出1个的过程。之所以要支持analyzer和task1对多的关系模型是因为有些analyzer处理会比较多,针对这种就需要设定多个task,以保证处理效率。
消费过程相对简单,周期任务通过守护线程不断使用MessageAnalyzer对MessageQueue进行分析,如果MessageQueue#poll数据不为空则对poll出的MessageTree进行业务处理。每种MessageAnalyzer针对具体应用场景做相应处理,用户也可以自定义analyzer,只需要被容器感知就行。
原文:http://blog.csdn.net/szwandcj/article/details/50992580