在很久之前写过一篇Kafka相关的文章,你需要知道的Kafka,那个时候在业务上更多的是使用的是Kafka,而现在换了公司之后,更多的使用的是Rocketmq,本篇文章会尽力全面的介绍RocketMQ和Kafka各个关键点的比较,希望大家读完能有所收获。
RocketMQ前身叫做MetaQ, 在MeataQ发布3.0版本的时候改名为RocketMQ,其本质上的设计思路和Kafka类似,但是和Kafka不同的是其使用Java进行开发,由于在国内的Java受众群体远远多于Scala,所以RocketMQ是很多以Java语言为主的公司的首选。同样的RocketMQ和Kafka都是Apache基金会中的顶级项目,他们社区的活跃度都非常高,项目更新迭代也非常快。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
直接定义好一个producer,创建好Message,调用send方法即可。
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
对于RocketMQ先抛出几个问题:
对于RocketMQ的架构图,在大体上来看和Kafka并没有太多的差别,但是在很多细节上是有很多差别的,接下来会一一进行讲述。
在3.1的架构中我们有多个Producer,多个主Broker,多个从Broker,每个Producer可以对应多个Topic,每个Consumer也可以消费多个Topic。
Broker信息会上报至NameServer,Consumer会从NameServer中拉取Broker和Topic的信息。
很多朋友都在问什么是无状态呢?状态的有无实际上就是数据是否会做存储,有状态的话数据会被持久化,无状态的服务可以理解就是一个内存服务,NameServer本身也是一个内存服务,所有数据都存储在内存中,重启之后都会丢失。
在RocketMQ中的每一条消息,都有一个Topic,用来区分不同的消息。一个主题一般会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生产者写入的新消息。
在Topic中有分为了多个Queue,这其实是我们发送/读取消息通道的最小单位,我们发送消息都需要指定某个写入某个Queue,拉取消息的时候也需要指定拉取某个Queue,所以我们的顺序消息可以基于我们的Queue维度保持队列有序,如果想做到全局有序那么需要将Queue大小设置为1,这样所有的数据都会在Queue中有序。
在上图中我们的Producer会通过一些策略进行Queue的选择:
我们同一组Consumer也会根据一些策略来选Queue,常见的比如平均分配或者一致性Hash分配。
要注意的是当Consumer出现下线或者上线的时候,这里需要做重平衡,也就是Rebalance,RocketMQ的重平衡机制如下:
由于重平衡是定时做的,所以这里有可能会出现某个Queue同时被两个Consumer消费,所以会出现消息重复投递。
Kafka的重平衡机制和RocketMQ不同,Kafka的重平衡是通过Consumer和Coordinator联系来完成的,当Coordinator感知到消费组的变化,会在心跳过程中发送重平衡的信号,然后由一个ConsumerLeader进行重平衡选择,然后再由Coordinator将结果通知给所有的消费者。
在RocketMQ中Queue被分为读和写两种,在最开始接触RocketMQ的时候一直以为读写队列数量配置不一致不会出现什么问题的,比如当消费者机器很多的时候我们配置很多读的队列,但是实际过程中发现会出现消息无法消费和根本没有消息消费的情况。
这个功能在RocketMQ在我看来明显没什么用,因为基本上都会设置为读写队列大小一样,这个为啥不直接将其进行统一,反而容易让用户配置不一样出现错误。
这个问题在RocketMQ的Issue里也没有收到好的答案。
一般来说消息队列的消费模型分为两种,基于推送的消息(push)模型和基于拉取(poll)的消息模型。
基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,但是这种方式无法很好地保证消费的处理语义。比如当我们把已经把消息发送给消费者之后,由于消费进程挂掉或者由于网络原因没有收到这条消息,如果我们在消费代理将其标记为已消费,这个消息就永久丢失了。如果我们利用生产者收到消息后回复这种方法,消息代理需要记录消费状态,这种不可取。
用过RocketMQ的同学肯定不禁会想到,在RocketMQ中不是提供了两种消费者吗?
MQPullConsumer和MQPushConsumer,其中MQPushConsumer不就是我们的推模型吗?其实这两种模型都是客户端主动去拉消息,其中的实现区别如下:
消费模式我们分为两种,集群消费,广播消费:
在Kafka中使用的原生的socket实现网络通信,而RocketMQ使用的是Netty网络框架,现在越来越多的中间件都不会直接选择原生的socket,而是使用的Netty框架,主要得益于下面几个原因:
选择框架是一方面,而想要保证网络通信的高效,网络线程模型也是一方面,我们常见的有1+N(1个Acceptor线程,N个IO线程),1+N+M(1个acceptor线程,N个IO线程,M个worker线程)等模型,RocketMQ使用的是1+N1+N2+M的模型,如下图所示:
1个acceptor线程,N1个IO线程,N2个线程用来做Shake-hand,SSL验证,编解码;M个线程用来做业务处理。这样的好处将编解码,和SSL验证等一些可能耗时的操作放在了一个单独的线程池,不会占据我们业务线程和IO线程。
做为一个好的消息系统,高性能的存储,高可用都不可少。
RocketMQ和Kafka的存储核心设计有很大的不同,所以其在写入性能方面也有很大的差别,这是16年阿里中间件团队对RocketMQ和Kafka不同Topic下做的性能测试:
从图上可以看出:
那RocketMQ为什么在多Topic的情况下,依然还能很好的保持较多的吞吐量呢?我们首先来看一下RocketMQ中比较关键的文件:
这里有四个目录(这里的解释就直接用RocketMQ官方的了):
我们发现我们的消息主体数据并没有像Kafka一样写入多个文件,而是写入一个文件,这样我们的写入IO竞争就非常小,可以在很多Topic的时候依然保持很高的吞吐量。有同学说这里的ConsumeQueue写是在不停的写入呢,并且ConsumeQueue是以Queue维度来创建文件,那么文件数量依然很多,在这里ConsumeQueue的写入的数据量很小,每条消息只有20个字节,30W条数据也才6M左右,所以其实对我们的影响相对Kafka的Topic之间影响是要小很多的。我们整个的逻辑可以如下:
Producer不断的再往CommitLog添加新的消息,有一个定时任务ReputService会不断的扫描新添加进来的CommitLog,然后不断的去构建ConsumerQueue和Index。
注意:这里指的都是普通的硬盘,在SSD上面多个文件并发写入和单个文件写入影响不大。
读取消息
Kafka中每个Partition都会是一个单独的文件,所以当消费某个消息的时候,会很好的出现顺序读,我们知道OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取,将数据放入PageCache,所以Kafka的读取消息性能比较好。
RocketMQ读取流程如下:
ConsumerQueue也是每个Queue一个单独的文件,并且其文件体积小,所以很容易利用PageCache提高性能。而CommitLog,由于同一个Queue的连续消息在CommitLog其实是不连续的,所以会造成随机读,RocketMQ对此做了几个优化:
我们首先需要选择一种集群模式,来适应我们可忍耐的可用程度,一般来说分为三种:
一般来说投入生产环境的话都会选择第四种,来保证最高的可用性。
当我们选择好了集群模式之后,那么我们需要关心的就是怎么去存储和复制这个数据,rocketMQ对消息的刷盘提供了同步和异步的策略来满足我们的,当我们选择同步刷盘之后,如果刷盘超时会给返回FLUSH_DISK_TIMEOUT,如果是异步刷盘不会返回刷盘相关信息,选择同步刷盘可以尽最大程度满足我们的消息不会丢失。
除了存储有选择之后,我们的主从同步提供了同步和异步两种模式来进行复制,当然选择同步可以提升可用性,但是消息的发送RT时间会下降10%左右。
我们上面对于master-slave部署模式已经做了很多分析,我们发现,当master出现问题的时候,我们的写入怎么都会不可用,除非恢复master,或者手动将我们的slave切换成master,导致了我们的Slave在多数情况下只有读取的作用。RocketMQ在最近的几个版本中推出了Dleger-RocketMQ,使用Raft协议复制CommitLog,并且自动进行选主,这样master宕机的时候,写入依然保持可用。
有关Dleger-RocketMQ的信息更多的可以查看这篇文章:Dledger-RocketMQ 基于Raft协议的commitlog存储库。
定时消息和延时消息在实际业务场景中使用的比较多,比如下面的一些场景:
在开源版本的RocketMQ中延时消息并不支持任意时间的延时,需要设置几个固定的延时等级,目前默认设置为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,从1s到2h分别对应着等级1到18,而阿里云中的版本(要付钱)是可以支持40天内的任何时刻(毫秒级别)。我们先看下在RocketMQ中定时任务原理图:
可以看见延时消息是利用新建单独的Topic和Queue来实现的,如果我们要实现40天之内的任意时间度,基于这种方案,那么需要402460601000个queue,这样的成本是非常之高的,那阿里云上面的支持任意时间是怎么实现的呢?这里猜测是持久化二级TimeWheel时间轮,二级时间轮用于替代我们的ConsumeQueue,保存Commitlog-Offset,然后通过时间轮不断的取出当前已经到了的时间,然后再次投递消息。具体的实现逻辑需要后续会单独写一篇文章。
事务消息同样的也是RocketMQ中的一大特色,其可以帮助我们完成分布式事务的最终一致性,有关分布式事务相关的可以看我以前的很多文章都有很多详细的介绍,这里直接关注公众号:咖啡拿铁。
具体使用事务消息步骤如下:
事务消息的使用整个流程相对之前几种消息使用比较复杂,下面是事务消息实现的原理图:
我们发现RocketMQ实现事务消息也是通过修改原Topic信息,和延迟消息一样,然后模拟成消费者进行消费,做一些特殊的业务逻辑。当然我们还可以利用这种方式去做RocketMQ更多的扩展。
这里让我们在回到文章中提到的几个问题:
想必读完这篇文章,你心中已经有答案。这篇文章主要讲了RocketMQ全面的设计架构,如果你还没有看够,那么就请关注我的公众号吧。
如果大家觉得这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O:
原文:https://blog.51cto.com/14980978/2544646