首页 > 其他 > 详细

发送消息超时,咋办

时间:2020-01-17 19:30:20      阅读:95      评论:0      收藏:0      [点我收藏+]

同步发送,最多发送 3 次

这里指的是发送成功,等待 broker 的响应时发生超时,客户端有理由认为是网络不好,数据没有到达 broker,因此重复发送消息,也就是这种情况会导致 broker 存在重复消息。当发生 RemotingException 或 MQClientException 时,会重复发送。

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
for (; times < timesTotal; times++) {
    try {
        SendResult sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
    } catch (RemotingException e) {
        endTimestamp = System.currentTimeMillis();
        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
        log.warn(msg.toString());
        exception = e;
        continue;
    } catch (MQClientException e) {
        endTimestamp = System.currentTimeMillis();
        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
        log.warn(msg.toString());
        exception = e;
        continue;
    } catch (OtherException e) {
        break;
    ]

}

 

异步发送,只发送 1 次。

 

我本来打算,在 SendMessageProcessor#processRequest 方法中 sleep 一段时间,模拟网络延迟,结果发现 broker 有个自我保护,如果线程池队列中的请求自创建起 200 ms 时间没有处理完,则表明压力大,丢弃该请求,并通知客户端

// org.apache.rocketmq.broker.latency.BrokerFastFailure#cleanExpiredRequestInQueue

发送消息超时,咋办

原文:https://www.cnblogs.com/allenwas3/p/12207075.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!