消息由PullMessageService类采用单线程的方式从每个消息队列中轮询拉取。
一个PullRequest对应一个消息队列。PullRequest启动时由RebalanceImpl类根据监听的TOPIC信息自动获取并放入请求队列中。如果消息队列发生更新则会进行更新操作。
1 // PullMessageService类 2 public void run() { 3 log.info(this.getServiceName() + " service started"); 4 5 while (!this.isStopped()) { 6 try { 7 PullRequest pullRequest = this.pullRequestQueue.take(); 8 //不同的请求对应不同的MessageQueue 9 this.pullMessage(pullRequest); 10 } catch (InterruptedException ignored) { 11 } catch (Exception e) { 12 log.error("Pull Message Service Run Method exception", e); 13 } 14 } 15 16 log.info(this.getServiceName() + " service end"); 17 }
数据流控在DefaultMQPushConsumerImpl 中进行控制处理;
流控主要分为两个维度,一个是待处理的消息数量,一个是待处理的消息大小。
1 //DefaultMQPushConsumerImpl类 2 public void pullMessage(final PullRequest pullRequest) { 3 final ProcessQueue processQueue = pullRequest.getProcessQueue(); 4 if (processQueue.isDropped()) { 5 log.info("the pull request[{}] is dropped.", pullRequest.toString()); 6 return; 7 } 8 9 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); 10 11 try { 12 this.makeSureStateOK(); 13 } catch (MQClientException e) { 14 log.warn("pullMessage exception, consumer state not ok", e); 15 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); 16 return; 17 } 18 19 if (this.isPause()) { 20 log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); 21 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); 22 return; 23 } 24 25 long cachedMessageCount = processQueue.getMsgCount().get(); 26 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); 27 28 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
//待处理消息数量大于临界值时,则发起流控。默认值是1000条 29 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 30 if ((queueFlowControlTimes++ % 1000) == 0) { 31 log.warn( 32 "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", 33 this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 34 } 35 return; 36 } 37 38 if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
//待处理消息大小大于临界值时,则发起流控。默认值是100M 39 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); 40 if ((queueFlowControlTimes++ % 1000) == 0) { 41 log.warn( 42 "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", 43 this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); 44 } 45 return; 46 } 47 //省略代码,感兴趣去查找源文件 48 }
流控处理则是将当前PullRequest延迟timeDelay毫秒后,再放入到pullRequestQueue队列中等待下次拉取处理。
// PullMessageService类 public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { if (!isStopped()) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { PullMessageService.this.executePullRequestImmediately(pullRequest); } }, timeDelay, TimeUnit.MILLISECONDS); } else { log.warn("PullMessageServiceScheduledThread has shutdown"); } }
可以设置参数在拉取到待消费的数据后,流控一段时间进行下次拉取。
1 PullCallback pullCallback = new PullCallback() { 2 @Override 3 public void onSuccess(PullResult pullResult) { 4 if (pullResult != null) { 5 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, 6 subscriptionData); 7 8 switch (pullResult.getPullStatus()) { 9 case FOUND: 10 long prevRequestOffset = pullRequest.getNextOffset(); 11 pullRequest.setNextOffset(pullResult.getNextBeginOffset()); 12 long pullRT = System.currentTimeMillis() - beginTimestamp; 13 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), 14 pullRequest.getMessageQueue().getTopic(), pullRT); 15 16 long firstMsgOffset = Long.MAX_VALUE; 17 if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { 18 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 19 } else { 20 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); 21 22 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), 23 pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); 24 25 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); 26 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( 27 pullResult.getMsgFoundList(), 28 processQueue, 29 pullRequest.getMessageQueue(), 30 dispatchToConsume); 31 32 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { 33 // 控制拉取频率,默认值是0 34 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 35 DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); 36 } else { 37 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); 38 } 39 } 40 41 if (pullResult.getNextBeginOffset() < prevRequestOffset 42 || firstMsgOffset < prevRequestOffset) { 43 log.warn( 44 "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", 45 pullResult.getNextBeginOffset(), 46 firstMsgOffset, 47 prevRequestOffset); 48 } 49 50 break;
默认的流控值
/** * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit */ private int pullThresholdForQueue = 1000; /** * Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default, * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit * * <p> * The size of a message only measured by message body, so it‘s not accurate */ private int pullThresholdSizeForQueue = 100; /** * Message pull Interval */ private long pullInterval = 0;
拉取到的消息都放到消费线程池中进行处理。可以通过控制消费线程池大小来达到流控作用,降低对消费端的消耗。
//ConsumeMessageConcurrentlyService类 public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { //消息的处理交到消费线程池进行处理; this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); }
配置参数
/** * Minimum consumer thread number */ private int consumeThreadMin = 20; /** * Max consumer thread number */ private int consumeThreadMax = 20; /** * Threshold for dynamic adjustment of the number of thread pool */ private long adjustThreadPoolNumsThreshold = 100000;
源码参考:RocketMQ版本4.6.0 https://github.com/apache/rocketmq
原文:https://www.cnblogs.com/whroid/p/14629045.html