启动一个consumer的时候,我用的是DefaultMQPushConsumer。根据之前的 博文 ,push其实还是一次次的pullrequest,也就是拉,这里有个问题:如果需要实时性很高,也就是说broker收到一条消息,订阅的consumer就要马上收到,那么consumer就需要不停的轮询,一次pullrequest收不到消息,马上进行下一次请求,这样就非常的耗费资源。
rocketMQ是这样解决的:
1、broker这边,请求过来,如果有新消息返回,在consumer这边,异步请求的回调函数pullCallback中,判断pullResult不为null,那么把消息存到processQueue中之后,马上发起下一个请求。
2、如果broker没有获取到新消息,并不会马上返回pullRequest,会在suspendPullRequest方法中,把当前的请求信息(主要是offset,group,topic,requestId这几个值)放到PullRequestHoldService.pullRequestTable中,而在ReputMessageService的doReput--->messageArrivingListener.arriving--->pullRequestHoldService.notifyMessageArriving--->mpr = this.pullRequestTable.get(key)--->requestList = mpr.cloneListAndClear() 把刚才存进去的所有pullRequest取出来,取到消息再返回。这样就避免了不停的轮询。
RocketMQ 怎样解决为了 实时拉取消息 而不得不一直轮询的问题
原文:https://www.cnblogs.com/chuliang/p/13141279.html