首页 > 其他 > 详细

RocketMq总结(四) -- 消费者拉取消息

时间:2021-08-10 23:20:00      阅读:19      评论:0      收藏:0      [点我收藏+]

一 ProcessQueue

  ProcessQueue 是 MessageQueue 在消费端的重现、快照。 PullMessageService 从消息服务器默认每次拉取 32 条消息,按消息的队列偏移 顺序存放在 ProcessQueue 中,PullMessageService 然后将消息提交到消费者消费线程池,消息成功消费后 ProcessQueue中移除。

private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
  消息存储容器 键为消息在 onsumeQueue 中的偏移 MessageExt :消息实体
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
  消息临时存储容器, 为消息在 ConsumeQueue 中的偏移量, MessageExt 为消息实体,该结构用于处理顺序消息,消息消费线程从 ProcessQueue msgTreeMap 中取出消息前,先将消息临时存储在 consumingMsgOrderlyTreeMap
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
  读写锁,控制多线程并发修改 msgTreeMap consumingMsgOrderlyTreeMap
private final AtomicLong msgCount = new AtomicLong();
  ProcessQueue 中总消息数
private volatile long queueOffsetMax = 0L;
最大的偏移量
private volatile boolean dropped = false;//该队列是否被丢弃
private volatile long lastPullTimestamp = System.currentTimeMillis();//上次的拉取时间戳
private volatile long lastConsumeTimestamp = System.currentTimeMillis();//上次的消费时间戳

二  DefaultMQPushConsumerImpl#pullMessage

  拉取消息的核心逻辑在网络通信层

try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );

  pullAPIWrapper#pullKernelImpl

FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);

  根据 brokerName BrokerId从MQClientlnstance 中获取 Broker 地址,在整个RocketMQBroker 的部署结构中,相同名称的 Broker 构成主从结构,其 BrokerId 会不

样,在每次拉取消息后,会给出一个建议,下次拉取从主节点还是从节点拉取。
 
public class FindBrokerResult {
    private final String brokerAddr;
    private final boolean slave;
    private final int brokerVersion;

  最终是通过netty的Channel进行发送

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

  2 处理返回的消息

  其实看懂了很简单,就是Netty客户端的常见写法

class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }

  

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });

  收到消息就是解码的过程,这部分不详细展开了。

  处理拉取到的消息逻辑在发送前的PullCallBack里,主要的代码逻辑是

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());//先把消息存到ProcessQueue里
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);//把消息包到一个ConsumeRequest里对消费线程池发送消费Request

                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);//再次把拉取请求放到拉取线程池队列里
                                }

总结下就是三件事

1 把消息先缓存在ProcessQueue里

2 把ProcessQueue包成一个ConsumeRequest,对线程池提交,异步执行

3 把下一次的拉取请求放入拉取队列

  

 

  

 

RocketMq总结(四) -- 消费者拉取消息

原文:https://www.cnblogs.com/juniorMa/p/15125718.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!