经过之前的分析,我们知道,一个请求显示经过层层的责任链,最后才会发出去。而决定发送到消息格式是在责任链中的一环完成的
InvokerProcessHandlerFactory # init()
public static void init() { if (!isInitialized) { if (Constants.MONITOR_ENABLE) { registerBizProcessFilter(new RemoteCallMonitorInvokeFilter()); } registerBizProcessFilter(new TraceFilter()); registerBizProcessFilter(new FaultInjectionFilter()); registerBizProcessFilter(new DegradationFilter()); registerBizProcessFilter(new ClusterInvokeFilter()); registerBizProcessFilter(new GatewayInvokeFilter()); registerBizProcessFilter(new ContextPrepareInvokeFilter()); registerBizProcessFilter(new SecurityFilter()); registerBizProcessFilter(new RemoteCallInvokeFilter()); bizInvocationHandler = createInvocationHandler(bizProcessFilters); isInitialized = true; } }
我们看这个 ClusterInvokeFilter
public class ClusterInvokeFilter extends InvocationInvokeFilter { private static final Logger logger = LoggerLoader.getLogger(ClusterInvokeFilter.class); public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) throws Throwable { InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig(); Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster()); if (cluster == null) { throw new IllegalArgumentException("Unsupported cluster type:" + cluster); } return cluster.invoke(handler, invocationContext); } }
这里以常见的 FailfastCluster 为例
@Override public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) throws Throwable { InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig(); InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig); boolean timeoutRetry = invokerConfig.isTimeoutRetry();
......
InvokerUtils
public static InvocationRequest createRemoteCallRequest(InvokerContext invokerContext, InvokerConfig<?> invokerConfig) { InvocationRequest request = invokerContext.getRequest(); if (request == null) { request = SerializerFactory.getSerializer(invokerConfig.getSerialize()).newRequest(invokerContext); invokerContext.setRequest(request); } return request; }
拿到序列化器在决定请求的类型,序列化器主要分为两大类,一个是ThiftSeralizer,一类是非ThiftSeralizer
ThriftSerializer
public InvocationRequest newRequest(InvokerContext invokerContext) throws SerializationException { return new GenericRequest(invokerContext); }
AbstractSerializer
public InvocationRequest newRequest(InvokerContext invokerContext) throws SerializationException { return InvocationUtils.newRequest(invokerContext); }
最终调用的是 DefaultInvocationBuilder
public InvocationRequest newRequest(InvokerContext invokerContext) { return new DefaultRequest(invokerContext); }
到这里就知道了,请求的类型就两大类,一类是GenericRequest 一类是 DefaultRequest
我们看客户端的channelHandlers。习惯了netty4的写法刚开始看还真不习惯 呵呵。
public class NettyClientPipelineFactory implements ChannelPipelineFactory { private NettyClient client; private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig(); public NettyClientPipelineFactory(NettyClient client) { this.client = client; } public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); pipeline.addLast("framePrepender", new FramePrepender()); pipeline.addLast("frameDecoder", new FrameDecoder()); pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig)); pipeline.addLast("compressHandler", new CompressHandler(codecConfig)); pipeline.addLast("invokerDecoder", new InvokerDecoder()); pipeline.addLast("invokerEncoder", new InvokerEncoder()); pipeline.addLast("clientHandler", new NettyClientHandler(this.client)); return pipeline; } }
主要分析这个类 InvokerEncoder。继续分析最终加密的逻辑在
AbstractEncoder # encode
public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { if (msg instanceof InvocationSerializable) { InvocationSerializable _msg = (InvocationSerializable) msg; try { ChannelBuffer frame; CodecEvent codecEvent; if (msg instanceof UnifiedInvocation) {//这就是Thrift对应的请求类型 frame = _doEncode(channel, (UnifiedInvocation) _msg); codecEvent = new CodecEvent(frame, true); } else { frame = doEncode(channel, _msg);//这是一般类型 codecEvent = new CodecEvent(frame, false); }
protected ChannelBuffer _doEncode(Channel channel, UnifiedInvocation msg) throws IOException { ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH, channel.getConfig().getBufferFactory())); //magic os.write(CodecConstants._MAGIC);//(byte) 0xAB (byte) 0xBA os.writeByte(msg.getProtocolVersion());//第三个字节 //serialize byte serialize = SerializerFactory.convertToUnifiedSerialize(msg.getSerialize()); //serialize os.writeByte(serialize);//第4个字节序列化方式 //totalLength os.writeInt(Integer.MAX_VALUE);//5-8字节是消息体长度 serialize(msg.getSerialize(), os, msg, channel); ChannelBuffer frame = os.buffer(); //totalLength frame.setInt(CodecConstants._HEAD_LENGTH, frame.readableBytes() - CodecConstants._FRONT_LENGTH_);//这里会重新设置5-8字节的长度值
Pigeon源码分析(三) -- 客户端发送tcp底层源码分析
原文:https://www.cnblogs.com/juniorMa/p/14842724.html