一 ProcessQueue
ProcessQueue 是 MessageQueue 在消费端的重现、快照。 PullMessageService 从消息服务器默认每次拉取 32 条消息,按消息的队列偏移 顺序存放在 ProcessQueue 中,PullMessageService 然后将消息提交到消费者消费线程池,消息成功消费后 ProcessQueue中移除。
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
private final AtomicLong msgCount = new AtomicLong();
private volatile long queueOffsetMax = 0L; 最大的偏移量 private volatile boolean dropped = false;//该队列是否被丢弃 private volatile long lastPullTimestamp = System.currentTimeMillis();//上次的拉取时间戳 private volatile long lastConsumeTimestamp = System.currentTimeMillis();//上次的消费时间戳
二 DefaultMQPushConsumerImpl#pullMessage
拉取消息的核心逻辑在网络通信层
try { this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback );
pullAPIWrapper#pullKernelImpl
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
根据 brokerName BrokerId从MQClientlnstance 中获取 Broker 地址,在整个RocketMQBroker 的部署结构中,相同名称的 Broker 构成主从结构,其 BrokerId 会不
public class FindBrokerResult { private final String brokerAddr; private final boolean slave; private final int brokerVersion;
最终是通过netty的Channel进行发送
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
2 处理返回的消息
其实看懂了很简单,就是Netty客户端的常见写法
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } pipeline.addLast( defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyClientHandler()); } });
收到消息就是解码的过程,这部分不详细展开了。
处理拉取到的消息逻辑在发送前的PullCallBack里,主要的代码逻辑是
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());//先把消息存到ProcessQueue里 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);//把消息包到一个ConsumeRequest里对消费线程池发送消费Request if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);//再次把拉取请求放到拉取线程池队列里 }
总结下就是三件事
1 把消息先缓存在ProcessQueue里
2 把ProcessQueue包成一个ConsumeRequest,对线程池提交,异步执行
3 把下一次的拉取请求放入拉取队列
原文:https://www.cnblogs.com/juniorMa/p/15125718.html