最近在写一个基于RabbitMQ的消息总线。虽然RabbitMQ提供了plugin的机制可以实现对其进行扩展,但是由于对erlang语言不熟,考虑到上手成本高,因此放弃实现plugin,转而基于Smart client + 树形拓扑路由的模型。当然这也大大降低了我们实现功能的灵活性,后面我会找个时间开篇新文章,谈谈Smart Client的限制。
预备知识
RabbitMQ对于消息的通信只提供了几个非常简单的API:Channel#basicPublish;Channel#basicConsume(Channel#basicGet)。API非常简单,几乎可以看作只提供了produce/consume这一种通信模型。但RabbitMQ的灵活性在于其多种exchange的组合。如果你不了解exchange的种类,请移步官网的说明文档。
有几个原则:RabbitMQ一切消息都发送给exchange,一切消息都从queue中获取。至于消息怎样从exchange路由到queue,这取决于exchange的类型+exchange与exchange或exchange与queue共同组合的结果(但前提是queue只能作为叶子节点,queue之间无法组合)。
树形拓扑
消息总线的整个架构采用的是SmartClient + Server端树形拓扑形式。这是非常典型的代理式拓扑结构,这种拓扑的好处是:方便隐藏内部细节、方便统一管控、方便实现拦截等。虽然是树形拓扑结构,但对于其细节的实现上还是有很大的讲究,下面谈谈在实现过程中的一些个人想法。
App在拓扑结构中的权衡
最初的实现很简单,只支持了点对点的produce/consume模型,这也是rabbitmq的原生模型。当时,一部分关注点放在它可以以非常小的成本记录所有通过总线的日志上。其树形拓扑是这样的:
(备注:图中长方形表示exchange,圆形表示queue,下同)
在实现完成之后,发现这些队列的粒度太粗,几乎是app级别的。这无法满足一个app需要有多个队列的需求,而这种需求在一个app被拆分成分布式组件之后是很常见的。因此,这里必须将队列的粒度进一步细化,几乎细化成一种队列”服务”(没错,你可以这么理解,每个队列无论在生产消息,还是在消费消息,就好像在提供一种服务)。而app是这些“服务队列”的上级。因此这些app被上提为最后一个层级的exchange,成为这些服务队列的组织者,而队列都处于同一高度上,这样也便于管控。于是拓扑图变成这样:
继续思考过后发现,将App从queue上提为最后一级的exchange,唯一好处就是能够在管控台上一目了然得看出某个queue的归属,以及对一个app下的所有的queue进行管控。其实要做到上面说的这些,只需要维护好app跟queue的逻辑关系(通过管控台界面操纵数据库)即可,没必要维护让app成为真实的exchange还使得整个路由多了一层,反而影响性能。因此决定把这个app exchange层去掉。
request/response实现的权衡
消息总线目前提供四种通信模型:
- produce/consume
- request/response
- publish/subscribe
- broadcast
从上面的拓扑图可以看出主要有两个消息路由exchange:pubsub,business。其中,business中的消息是point to point的,一个消息只能被消费一次而且有且只能有一个消费者。而pubsub的消息是one to many的,这从实现上不可复用同一套体系。回到上文,我们说提供了四中通信模型,而我们这里只有两种消息路由exchange,也就是说通信模型复用了路由exchange,它们的对应关系是:
- produce/consume -> business
- request/response -> business
- publish/subscribe -> pubsub
- broadcast ->pubsub
复用路由exchange的结果导致了队列的复用,这给 request/response的实现带来了不小的麻烦。主要有以下两个问题:
- 队列的职责不清晰:produce消息跟request消息混杂于同一个队列中而消费消息的API却是区分的,这会导致一个消费API消费它“不想”消费的消息(比如response遇到produce的消息)。这时这些它"不想"消费的消息,没有好的处理方式。
- 无限等待:这个主要跟request/response的实现机制相关。它是一种阻塞式同步等待响应的通信方式。如果它跟produce/consume模式复用队列,那么如果队列中有produce发送的大量消息未曾消费,那么request发送过来的消息将不可能得到及时消费,导致response也几乎不可能迅速给出应答,也就是request/response模式有可能根本就无法真正实现。
上面的症结主要在于不同类型的消息复用了相同的队列,但却区分了消费不同类型消息的API;而且队列里不同类型消息的顺序是不可测的。
这里就存在两种解决思路:
- 复用队列,复用消费API,针对不同的消息指定不同的处理逻辑;
- 不复用队列,区分消费API。
我几乎直接排除了第一种思路,因为区分通信模型,就是为了给client编程提供方便,封装的目的就是为了更高级别的抽象;而且第一种思路肯定无法实现request/response这种模型,因为这种模型的特征就是实时性,而复用队列,无法保证这一点。
Publish/subscribe实现的权衡
从上图你可以看到pubsub那个exchange的类型是fanout的(fanout表示扩散,即跟它绑定的exchange和queue都能收到消息)。其实实现publish/subscribe在RabbitMQ的基础上,可以有两种实现方式:
- pubsub exchange为fanout,客户端筛选
- pubsub exchange为 topic,服务端精准推送
稍微解释一下,首先你需要了解:这是一个通用的exchange,也就是凡是通信类似为publish/subscirbe类型的队列都绑定在它下面。
对于第一种方式:如果为fanout,那么客户端只需要调用publish推送一条消息,当消息被路由到pubsub,那么下面所有绑定的队列都会收到该消息(也就是这个fanout其实就是广播的方式)。那么怎么让不是订阅该channel的队列不接收到该消息呢?其实是客户端调用subscribe的时候,接收到消息在客户端内部过滤的(客户端连接着一个管控中心,可以获取订阅关系)。
对于第二种方式:正好跟第一种实现是相反的,它将pubsub的exchange定义为topic。当客户端调用publish推送一条消息时,它通过获取订阅关系,一条条发送到pubsub下的每个订阅的队列中,因此此处真实的发送次数跟该channel的订阅数一致;而这种情况下,subscribe只管消费,无需筛选,因为可以确定发送到其队列里的消息肯定是它订阅的channel发送的!
对于这两种模式,可谓各有利弊:
具体的实现方式应该取决于应用的场景。这是一种权衡,在性能跟资源之间的权衡。目前,的实现方式已从第一种方式切换成第二种方式——为了跟其他两种模型一致。因为其他两种模型都是生产者验证权限,而消费者只管消费。
broadcast实现的权衡
我们对广播的认知应该是可被所有通信参与者实时接收的消息(有时广播消息甚至可以认为是不可拒绝的,但这个取决场景)。因为consume与response也应该可以接收到广播消息。因此,它不应该跟publish/subscribe混合处理(虽然他们的从技术实现上是非常相似的,你可以认为broadcast是publish/subscribe的一种特例)。如果不混合处理,那就意味着对于广播,又得另外针对每个通信参与者独立分配一个队列?也就是说,这个系统上有多少通信参与者,就会有多少对应的广播消息队列。
从之前的讨论来看,如果想队列数减少,就必须复用队列,而如果复用队列,我们就必须合并消费该队列里不同类型消息的API并且承担接收同类型消息不及时的风险。因此我尝试了下面这样的拓扑图结构,这也是目前而言,最新的拓扑结构图:
它让每个队列都得到了复用,除了接收跟自己职责相关的消息,还会接收到广播消息。这种设计的好处就是节省了服务器的资源。它跟普通业务消息复用了队列,这导致它只能在消费API被调用时“跟随”调用。而且上面谈到的某些纯粹的消息发送方比如produce,publish,它们通常是没有队列的,因此无法接收到广播消息,这也是一种权衡设计。
谈消息总线的路由模型
原文:http://blog.csdn.net/yanghua_kobe/article/details/44424235