首页 > 其他 > 详细

rabbitMQ基础知识--消息确认机制

时间:2019-10-26 22:08:15      阅读:93      评论:0      收藏:0      [点我收藏+]

一:确认种类

RabbitMQ的消息确认有两种。

一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。

第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html

这里我们重点研究下接收确认的情况。

为了保证消息从队列可靠的到达消费者,RabbitMQ提供了消息 确认机制,消费者在订阅队列的时候,可以指定autoAck参数,当autoACK等于false时,RabbitMQ会等待显示的恢复确认信号之后才从内存或者磁盘中移除消息(实质上是先打上删除标记,之后再删除)

       当autoAck属性为true的时候,RabbitMQ会自动把发送出去的消息标记为确认,然后从内存或者磁盘中移除,而不管消费者有没有收到消息。

      采用了这个机制后不用担心消费者接受不到消息的问题因为,消费者就算是挂了,消息会一直存在,消费者服务启动的时候消息就会自动的进行推送,因为RabbitMQ会一直等待持有消息直到消费者显示的调用Basic.ack命令为止。

       当开启消息确认机制以后,对于rabbitmq服务端而言,队列中的消息分成了两个部分,一部分是等待投递给消费者的消息;一部分是已经投递给消费者的消息。如果RabbitMQ一直没有收到消费者的确认信号,并且消费者此消息的消费者已经断开连接,则RabbitMQ会安排重新入队列,等待投递给下一个消费者,当然也有可能是原来的消费者。

     RabbitMQ除了提供了消息确认机制以外,在2.0以后的版本引入了Basic.Reject这个命令,意思是明确拒绝当前消息而不是确认。

单独的给一个消息接收者设置ack。

 

配置文件如下: 

rabbitmq:
  host: 192.168.1.203
  port: 5672
  username: admin
  password: admin
  publisher-confirms: true    #  消息发送到交换机确认机制,是否确认回调
  listener:
    acknowledge-mode :manual   # 每条消息必须手工确认

MQ接收的服务写法:

@Component
public class FirstConsumer1 {
    @Autowired
    private FirstSender firstSender;
    @RabbitListener(queues="test_testQueue" )
    public void handleMessage(Message message ,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        channel.basicAck(tag, false);   //手动确认消息已收到
        String  recvStr = new String(message.getBody());
        // 处理消息
        System.out.println("得到消息为"+recvStr);
        
        //根据业务逻辑登记判重表,在业务层面避免消息被二次消费

    }
}

 

 

acknowledgeMode有三值:

    A、NONE = no acks will be sent (incompatible with channelTransacted=true).

          RabbitMQ calls this "autoack" because the broker assumes all messages are acked without any action from the consumer.

    B、MANUAL = the listener must acknowledge all messages by calling Channel.basicAck().

    C、AUTO = the container will acknowledge the message automatically, unless the MessageListener throws an exception.

Note that acknowledgeMode is complementary to channelTransacted - if the channel is transacted then the broker requires a commit notification in addition to the ack. This is the default mode. See also txSize.

 

为什么接收的确认这么关键呢?现在大多数服务都是多台集群(比如3台),在这种场景下,如果没有确认的机制,那么这个队列的消息可能被三台服务器同时取到。

如何处理呢? 接收服务器在接收到消息后立即给MQ回复确认,这样其他服务器就无法获取到这个消息了。【这里其实还是不是太严谨,】

 

rabbitMQ基础知识--消息确认机制

原文:https://www.cnblogs.com/zhaoyansheng163/p/11745654.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!