RabbitMQ的消息确认有两种。
一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html
这里我们重点研究下接收确认的情况。
为了保证消息从队列可靠的到达消费者,RabbitMQ提供了消息 确认机制,消费者在订阅队列的时候,可以指定autoAck参数,当autoACK等于false时,RabbitMQ会等待显示的恢复确认信号之后才从内存或者磁盘中移除消息(实质上是先打上删除标记,之后再删除)
当autoAck属性为true的时候,RabbitMQ会自动把发送出去的消息标记为确认,然后从内存或者磁盘中移除,而不管消费者有没有收到消息。
采用了这个机制后不用担心消费者接受不到消息的问题因为,消费者就算是挂了,消息会一直存在,消费者服务启动的时候消息就会自动的进行推送,因为RabbitMQ会一直等待持有消息直到消费者显示的调用Basic.ack命令为止。
当开启消息确认机制以后,对于rabbitmq服务端而言,队列中的消息分成了两个部分,一部分是等待投递给消费者的消息;一部分是已经投递给消费者的消息。如果RabbitMQ一直没有收到消费者的确认信号,并且消费者此消息的消费者已经断开连接,则RabbitMQ会安排重新入队列,等待投递给下一个消费者,当然也有可能是原来的消费者。
RabbitMQ除了提供了消息确认机制以外,在2.0以后的版本引入了Basic.Reject这个命令,意思是明确拒绝当前消息而不是确认。
单独的给一个消息接收者设置ack。
配置文件如下:
rabbitmq: host: 192.168.1.203 port: 5672 username: admin password: admin publisher-confirms: true # 消息发送到交换机确认机制,是否确认回调 listener: acknowledge-mode :manual # 每条消息必须手工确认
MQ接收的服务写法:
@Component public class FirstConsumer1 { @Autowired private FirstSender firstSender; @RabbitListener(queues="test_testQueue" ) public void handleMessage(Message message ,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { channel.basicAck(tag, false); //手动确认消息已收到 String recvStr = new String(message.getBody()); // 处理消息 System.out.println("得到消息为"+recvStr); //根据业务逻辑登记判重表,在业务层面避免消息被二次消费 } }
acknowledgeMode有三值:
A、NONE = no acks will be sent (incompatible with channelTransacted=true).
RabbitMQ calls this "autoack" because the broker assumes all messages are acked without any action from the consumer.
B、MANUAL = the listener must acknowledge all messages by calling Channel.basicAck().
C、AUTO = the container will acknowledge the message automatically, unless the MessageListener throws an exception.
Note that acknowledgeMode is complementary to channelTransacted - if the channel is transacted then the broker requires a commit notification in addition to the ack. This is the default mode. See also txSize.
为什么接收的确认这么关键呢?现在大多数服务都是多台集群(比如3台),在这种场景下,如果没有确认的机制,那么这个队列的消息可能被三台服务器同时取到。
如何处理呢? 接收服务器在接收到消息后立即给MQ回复确认,这样其他服务器就无法获取到这个消息了。【这里其实还是不是太严谨,】
原文:https://www.cnblogs.com/zhaoyansheng163/p/11745654.html