首页 > 其他 > 详细

RocketMQ 怎样解决为了 实时拉取消息 而不得不一直轮询的问题

时间:2020-06-16 16:50:01      阅读:185      评论:0      收藏:0      [点我收藏+]

启动一个consumer的时候,我用的是DefaultMQPushConsumer。根据之前的   博文  ,push其实还是一次次的pullrequest,也就是拉,这里有个问题:如果需要实时性很高,也就是说broker收到一条消息,订阅的consumer就要马上收到,那么consumer就需要不停的轮询,一次pullrequest收不到消息,马上进行下一次请求,这样就非常的耗费资源。

rocketMQ是这样解决的:

1、broker这边,请求过来,如果有新消息返回,在consumer这边,异步请求的回调函数pullCallback中,判断pullResult不为null,那么把消息存到processQueue中之后,马上发起下一个请求。

2、如果broker没有获取到新消息,并不会马上返回pullRequest,会在suspendPullRequest方法中,把当前的请求信息(主要是offset,group,topic,requestId这几个值)放到PullRequestHoldService.pullRequestTable中,而在ReputMessageService的doReput--->messageArrivingListener.arriving--->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把刚才存进去的所有pullRequest取出来,取到消息再返回。这样就避免了不停的轮询。

RocketMQ 怎样解决为了 实时拉取消息 而不得不一直轮询的问题

原文:https://www.cnblogs.com/chuliang/p/13141279.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!