首页 > 其他 > 详细

【mq读书笔记】消息确认

时间:2020-02-08 12:22:29      阅读:66      评论:0      收藏:0      [点我收藏+]

接上文的集群模式,监听器返回RECONSUME_LATER,需要将将这些消息发送给Broker延迟消息。如果发送ack消息失败,将延迟5s后提交线程池进行消费。

入口:ConsumeMessageConcurrentlyService#sendMessageBack

命令编码:RequestCode.CONSUMER_SEND_MSG_BACK;

MQClientAPIImpl#consumerSendMessageBack:

public void consumerSendMessageBack(
        final String addr,
        final MessageExt msg,
        final String consumerGroup,
        final int delayLevel,
        final long timeoutMillis,
        final int maxConsumeRetryTimes
    ) throws RemotingException, MQBrokerException, InterruptedException {
        ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);

        requestHeader.setGroup(consumerGroup);
        requestHeader.setOriginTopic(msg.getTopic());
        requestHeader.setOffset(msg.getCommitLogOffset());
        requestHeader.setDelayLevel(delayLevel);
        requestHeader.setOriginMsgId(msg.getMsgId());
        requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);

        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark());
    }
public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
    @CFNotNull
    private Long offset;//消息物理偏移量
    @CFNotNull
    private String group;//消费组名
    @CFNotNull
    private Integer delayLevel;
    private String originMsgId;
    private String originTopic;
    @CFNullable
    private boolean unitMode = false;
    private Integer maxReconsumeTimes;

 

【mq读书笔记】消息确认

原文:https://www.cnblogs.com/lccsblog/p/12275379.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!