1、Java的编解码
在Java中编码(Encode)称为序列化, 它将对象序列化为字节数组,?于?络传输、数据持久化或者其它?途。解码(Decode)称为反序列化,它把从?络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷?),以?便后续的业务逻辑操作。
java序列化对象只需要实现java.io.Serializable接?并?成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。
java序列化?的是为了?络传输和对象持久化。java序列化存在一系列的缺点,例如?法跨语?、序列化后码流太?、序列化性能太低等问题。同时java序列化仅仅是Java编解码技术的?种,由于它的种种缺陷,衍?出了多种编解码技术和框架,这些编解码框架实现了消息的?效序列化。
2、Netty编解码器基本说明
在Netty中,ChannelHandler 充当了处理?站和出站数据的应?程序逻辑的容器。例如,实现ChannelInboundHandler 接?(或 ChannelInboundHandlerAdapter),就可以接收?站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从 ChannelInboundHandler 冲刷数据。业务逻辑通常写在?个或者多个 ChannelInboundHandler 中。ChannelOutboundHandler 原理?样,只不过它是?来处理出站数据的。
ChannelPipeline 提供了 ChannelHandler 链的容器。如果事件的运动?向是从客户端到服务端的,如果我们站在客户端的角度,那么就是出站事件,即客户端发送给服务端的数据会通过pipeline 中的?系列 ChannelOutboundHandler,并被这些 Handler 处理,反之则称为?站的。
在?络应?中需要实现某种编解码器,将原始字节数据与?定义的消息对象进?互相转换。?络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进?解码。
netty提供了强?的编解码器框架,使得我们编写?定义的编解码器很容易,也容易封装重?。同时Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是?种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在?起,以实现复杂的转换逻辑。对于Netty??,编解码器由两部分组成:编码器、解码器。
解码器:负责将消息从字节或其他序列形式转成指定的消息对象。负责处理?站 InboundHandler数据;
编码器:将消息对象转成字节或其他序列形式在?络上传输。负责处理出站 OutboundHandler”数据;
不论解码器 handler 还是编码器 handler ,接收的消息类型必须与待处理的消息类型?致,否则该 handler 不会被执?。Netty提供了很多编解码器,例如:
StringEncoder字符串编码器
StringDecoder字符串解码器
ObjectEncoder 对象编码器
ObjectDecoder 对象解码器
FixedLengthFrameDecoder 固定?度的解码器
LineBasedFrameDecoder 以换?符为结束标识的解码器
DelimiterBasedFrameDecoder 指定消息分隔符的解码器
LengthFieldBasedFrameDecoder基于?度通?解码器
解码器负责解码“?站”数据。从?种格式到另?种格式,解码器处理?站数据是抽象ChannelInboundHandler的实现。实践中使?解码器很简单,就是将?站数据转换格式后传递到ChannelPipeline中的下?个ChannelInboundHandler进?处理。
对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder
抽象解码器:
1) ByteToMessageDecoder: ?于将字节转为消息,需要检查缓冲区是否有?够的字节
2) ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有?够的字节,但是ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都?持。选择:项?复杂性?则使?ReplayingDecoder,否则使? ByteToMessageDecoder
3)MessageToMessageDecoder: ?于从?种消息解码为另外?种消息(例如POJO到POJO)
?于将接收到的?进制数据(Byte)解码,得到完整的请求报?(Message)。ByteToMessageDecoder是?种ChannelInboundHandler,可以称为解码器,负责将byte字节流(ByteBuf)转换成?种Message,Message是应?可以??定义的?种java对象。
下?列出了ByteToMessageDecoder三个主要?法:
protected abstract void decode(ChannelHandlerContext var1, ByteBuf var2, List<Object> var3) throws Exception; final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { ...... this.decode(ctx, in, out); ...... } protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.isReadable()) { this.decodeRemovalReentryProtection(ctx, in, out); } }
通过源码可以看到,decodeLast方法调用的额是decodeRemovalReentryProtection方法,而该方法又最终调用了decode方法,也就是说最终的实现逻辑是在decode方法中。同时decode方法也是该抽象类中唯一一个需要自己实现的方法。
decodeLast方法参数的作?如下:
BytuBuf in:需要解码的?进制数据。
List out:解码后的有效报?列表,我们需要将解码后的报?添加到这个List中。之所以使??个List表示,是因为考虑到粘包问题,因此?参的in中可能包含多个有效报?。
当然,也有可能发?了拆包,in中包含的数据还不?以构成?个有效报?,此时不往List中添加元素即可。另外特别要注意的是,在解码时,不能直接调?ByteBuf的readXXX?法来读取数据,?是应该?先要判断能否构成?个有效的报?。
案例,假设协议规定传输的数据都是int类型的整数,因为int类型占用四个字节,因此我们需要判断当需要转码的二进制数据大于等于4个字节时,才进行解码操作。
上图中显式输?的ByteBuf中包含4个字节,每个字节的值分别为:1,2,3,4。我们?定义?个ToIntegerDecoder进?解码,尽管这?我看到了4个字节刚好可以构成?个int类型整数,但是在真正解码之前,我们并不知道ByteBuf包含的字节数能否构成完成的有效报?,因此需要?先判断ByteBuf中剩余可读的字节,是否?于等于4,如下:
public class ToIntegerDecoder extends ByteToMessageDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4) { out.add(in.readInt()); } } }
只有在可读字节数>=4的情况下,我们才进?解码,即读取?个int,并添加到List中。在可读字节数?于4的情况下,我们并没有做任何处理,假设剩余可读字节数为3,不?以构成1个int。那么?类ByteToMessageDecoder发现这次解码List中的元素没有变化,则会对in中的剩余3个字节进?缓存,等待下1个字节的到来,之后再回到调?ToIntegerDecoder的decode?法。
另外需要注意: 在ToIntegerDecoder的decode?法中,每次最多只读取?个1个int。如果ByteBuf中的字节数很多,例如为16,那么可以构成4个int,?这?只读取了1个int,那么剩余12字节怎么办?ByteToMessageDecoder再每次回调?类的decode?法之后,都会判断输?的ByteBuf中是否还有剩余字节可读,如果还有,会再次回调?类的decode?法,直到某个回调decode?法List中的元素个数没有变化时才停?,元素个数没有变化,实际上意味着?类已经没有办法从剩余的字节中读取?个有效报?。由于存在剩余可读字节时,ByteToMessageDecoder会?动再次回调?类decode?法,在实现ByteToMessageDecoder时,decode?法每次只解析?个有效报?即可,没有必要?次全部解析出来。
ByteToMessageDecoder提供的?些常?的实现类:
FixedLengthFrameDecoder:定?协议解码器,我们可以指定固定的字节数算?个完整的报?
LineBasedFrameDecoder: 换?分隔符解码器,遇到\n或者\r\n,则认为是?个完整的报?
DelimiterBasedFrameDecoder: 分隔符解码器,与LineBasedFrameDecoder类似,只不过分隔符可以??指定
LengthFieldBasedFrameDecoder:?度编码解码器,将报?划分为报?头/报?体,根据报?头中的Length字段确定报?体的?度,因此报?提的?度是可变的
JsonObjectDecoder:json格式解码器,当检测到匹配数量的"{" 、”}”或”[””]”时,则认为是?个完整的json对象或者json数组。
这些实现类,都只是将接收到的?进制数据,解码成包含完整报?信息的ByteBuf实例后,就直接交给了之后的ChannelInboundHandler处理。
ReplayingDecoder是byteToMessage的子类,他与byteToMessage最大的不同就是不需要检查缓冲区是否有足够的的字节;若ByteBuf中有?够的字节,则会正常读取;若没有?够的字节则会停?解码。
ReplayingDecoder 使??便,但它也有?些局限性:
1) 不是所有的操作都被ByteBuf?持,如果调??个不?持的操作会抛出DecoderException。
2) ByteBuf.readableBytes()?部分时间不会返回期望值
3)ReplayingDecoder 在某些情况下可能稍慢于 ByteToMessageDecoder,例如?络缓慢并且消息格式复杂时,消息会被拆成了多个碎?,速度变慢
在满?需求的情况下推荐使?ByteToMessageDecoder,因为它的处理?较简单,没有ReplayingDecoder实现的那么复杂。ReplayingDecoder继承于ByteToMessageDecoder,所以他们提供的接?是相同的。下?代码是ReplayingDecoder的实现:
/** * Integer解码器,ReplayingDecoder实现 */ public class ToIntegerReplayingDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("ToIntegerReplayingDecoder 被调?"); //在 ReplayingDecoder 不需要判断数据是否?够读取,内部会进?处理判断 out.add(in.readInt()); } }
ByteToMessageDecoder是将?进制流进?解码后,得到有效报?。?MessageToMessageDecoder则是将?个本身就包含完整报?信息的对象转换成另?个java对象。前?介绍了ByteToMessageDecoder的部分?类解码后,会直接将包含了报?完整信息的ByteBuf实例交由之后的ChannelInboundHandler处理,此时,你可以在ChannelPipeline中,再添加?个MessageToMessageDecoder,将ByteBuf中的信息解析后封装到Java对象中,简化之后的ChannelInboundHandler的操作。另外,在?些场景下,有可能你的报?信息已经封装到了Java对象中,但是还要继续转成另外的Java对象,因此?个MessageToMessageDecoder后?可能还跟着另?个MessageToMessageDecoder。?个?较容易的理解的类?案例是java Web编程,通常客户端浏览器发送过来的?进制数据,已经被web容器(如tomcat)解析成了?个HttpServletRequest对象,但是我们还是需要将HttpServletRequest中的数据提取出来,封装成我们??的POJO类,也就是从?个java对象(HttpServletRequest)转换成另?个Java对象(我们的POJO类)。
MessageToMessageDecoder的类声明如下:
/** * 其中泛型参数I表示我们要解码的消息类型。例前?,我们在ToIntegerDecoder中,把?进制字节 流转换成了?个int类型的整数。 */ public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
类似的,MessageToMessageDecoder也有?个decode?法需要覆盖 ,如下:
/** * 参数msg,需要进?解码的参数。例如ByteToMessageDecoder解码后的得到的包含完整报?信息 ByteBuf * List<Object> out参数:将msg经过解析后得到的java对象,添加到放到List<Object> out中 */ protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
例如,现在我们想编写?个IntegerToStringDecoder,把前?编写的ToIntegerDecoder输出的int参数转换成字符串,此时泛型I就应该是Integer类型。
integerToStringDecoder源码如下所示:
public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> { @Override public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception { out.add(String.valueOf(msg)); } }
此时我们应该按照如下顺序组织ChannelPipieline中ToIntegerDecoder和IntegerToStringDecoder 的关系:也就是说,前?个ChannelInboudHandler输出的参数类型,就是后?个ChannelInboudHandler的输?类型。特别注意,如果我们指定MessageToMessageDecoder的泛型参数为ByteBuf,表示其可以直接针对ByteBuf进?解码,那么其是否能替代ByteToMessageDecoder呢?
答案是不可以的。因为ByteToMessageDecoder除了进?解码,还要会对不?以构成?个完整数据的报?拆包数据(拆包)进?缓存。?MessageToMessageDecoder则没有这样的逻辑。因此通常的使?建议是,使??个ByteToMessageDecoder进?粘包、拆包处理,得到完整的有效报?的ByteBuf实例,然后交由之后的?个或者多个MessageToMessageDecoder对ByteBuf实例中的数据进?解析,转换成POJO类。
通过继承ByteToMessageDecoder?定义解码器在解码器进?数据解码时,需判断缓存区(ByteBuf)的数据是否?够,否则收到结果与期望结果可能不?致
/** * todo ?定义解码器 */ public class ByteToLongDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("ByteToLongDecoder decode 被调?"); //todo 因为long占8个字节, 需要判断?于等于8个字节时才能读取?个long if(in.readableBytes() >= 8) { out.add(in.readLong()); } } }
Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,?者都实现ChannelOutboundHandler接?。
相对来说,编码器?解码器的实现要更加简单,原因在于解码器除了要按照协议解析数据,还要要处理粘包、拆包问题;?编码器只要将数据转换成协议规定的?进制格式发送即可。
MessageToByteEncoder也是?个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息转换成?进制流放?ByteBuf中。?类通过覆写其抽象?法encode来实现编码,如下所示:
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { .... protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; }
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> { @Override protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { out.writeInt(msg);//将Integer转成?进制字节流写?ByteBuf中 } }
MessageToMessageEncoder同样是?个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息放到?个List中。?类通过覆写其抽象?法encode,来实现编码,如下所示:
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter { ... protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception; ... }
与MessageToByteEncoder不同的,MessageToMessageEncoder编码后的结果放到的out参数类型是?个List中。例如,你?次发送2个报?,因此msg参数中实际上包含了2个报?,因此应该解码出两个报?对象放到List中。
MessageToMessageEncoder提供的常??类包括:
LineEncoder:按?编码,给定?个CharSequence(如String),在其之后添加换?符\n或者\r\n,并封装到ByteBuf进?输出,与LineBasedFrameDecoder相对应。
Base64Encoder:给定?个ByteBuf,得到对其包含的?进制数据进?Base64编码后的新的ByteBuf进?输出,与Base64Decoder相对应。
LengthFieldPrepender:给定?个ByteBuf,为其添加报?头Length字段,得到?个新的ByteBuf进?输出。Length字段表示报??度,与LengthFieldBasedFrameDecoder相对应。
StringEncoder:给定?个CharSequence(如:StringBuilder、StringBuffer、String等),将其转换成ByteBuf进?输出,与StringDecoder对应。
这些MessageToMessageEncoder实现类最终输出的都是ByteBuf,因为最终在?络上传输的都要是?进制数据。
通过继承MessageToByteEncoder?定义编码器
public class LongToByteEncoder extends MessageToByteEncoder<Long> { @Override protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception { System.out.println("LongToByteEncoder encode被调?"); System.out.println("msg=" + msg); out.writeLong(msg); } }
编码解码器同时具有编码与解码功能,特点是同时实现了ChannelInboundHandler和ChannelOutboundHandler接?,因此在数据输?和输出时都能进?处理。
Netty提供提供了?个ChannelDuplexHandler适配器类,编码解码器的抽象基类ByteToMessageCodec 、MessageToMessageCodec都继承与此类。
ByteToMessageCodec内部维护了?个ByteToMessageDecoder和?个MessageToByteEncoder实例,可以认为是?者的功集合,泛型参数I是接受的编码类型:
public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler { private final TypeParameterMatcher outboundMsgMatcher; private final MessageToByteEncoder<I> encoder; private final ByteToMessageDecoder decoder = new ByteToMessageDecoder(){…} ... protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception; ... }
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler { private final MessageToMessageEncoder<Object> encoder= ... private final MessageToMessageDecoder<Object> decoder =… ... protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out) throws Exception; protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out) throws Exception; }
其他编解码?式:
使?编解码器来充当编码器和解码器的组合失去了单独使?编码器或解码器的灵活性,编解码器是要么都有要么都没有。你可能想知道是否有解决这个僵化问题的?式,还可以让编码器和解码器在ChannelPipeline中作为?个逻辑单元。幸运的是,Netty提供了?种解决?案,使?CombinedChannelDuplexHandler。如何使?CombinedChannelDuplexHandler来结合解码器和编码器呢?下?我们从两个简单的例?看了解。
/** * 解码器,将byte转成char */ public class ByteToCharDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while(in.readableBytes() >= 2){ out.add(Character.valueOf(in.readChar())); } }
/** * 编码器,将char转成byte */ public class CharToByteEncoder extends MessageToByteEncoder<Character> { @Override protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception { out.writeChar(msg); } }
/** * 继承CombinedChannelDuplexHandler,?于绑定解码器和编码器 */ public class CharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> { public CharCodec(){ super(new ByteToCharDecoder(), new CharToByteEncoder()); } }
从上?代码可以看出,使?CombinedChannelDuplexHandler绑定解码器和编码器很容易实现,?使?Codec更灵活。
TCP是个流协议,所谓流,就是没有界限的?串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进?包的划分,所以在业务上认为,?个完整的包可能会被TCP拆分成多个包进?发送,也有可能把多个?的包封装成?个?的数据包发送,这就是所谓的TCP粘包和拆包问题。
TCP粘包:把多个?的包封装成?个?的数据包发送,发送?发送的若?数据包到接收?时粘成?个包
TCP拆包:把?个完整的包拆分为多个?包进?发送,发送?发送?个数据包到接收?时被拆分为若?个?包
如上图所示,就是粘包拆包的各个场景演示。假设客户端分别发送了两个数据包 D1 和 D2 给服务端,由于服务端?次读取到字节数是不确定的,故可能存在以下四种情况:
1. 服务端分两次读取到了两个独?的数据包,分别是 D1 和 D2 ,没有粘包和拆包
2. 服务端?次接受到了两个数据包, D1 和 D2 粘合在?起,称之为 TCP 粘包
3. 服务端分两次读取到了数据包,第?次读取到了完整的 D1 包和 D2 包的部分内容,第?次读取到了 D2 包的剩余内容,这称之为 TCP 拆包
4. 服务端分两次读取到了数据包,第?次读取到了 D1 包的部分内容 D1_1 ,第?次读取到了 D1包的剩余部分内容 D1_2 和完整的 D2 包。
发?TCP粘包、拆包主要是由于下??些原因:
应?程序写?的数据?于缓冲区??,这将会发?拆包。
应?程序写?数据?于字缓冲区??,?卡将应?多次写?的数据发送到?络上,这将会发?粘包。
进?MSS(最?报??度)??的TCP分段,当TCP报??度-TCP头部?度>MSS的时候将发?拆包。
接收?法不及时读取套接字缓冲区数据,这将发?粘包。
MSS: 是Maximum Segement Size缩写,表示TCP报?中data部分的最??度,是TCP协议在OSI五层?络模型中传输层对?次可以发送的最?数据的限制。
MTU: 最?传输单元,是Maxitum Transmission Unit的简写,是OSI五层?络模型中链路层(datalink layer)对?次可以发送的最?数据的限制。
当需要传输的数据?于MSS或者MTU时,数据会被拆分成多个包进?传输。由于MSS是根据MTU计算出来的,因此当发送的数据满?MSS时,必然满?MTU。
发送端的字节流都会先传?缓冲区,再通过?络传?到接收端的缓冲区中,最终由接收端获取。当我们发送两个完整包到接收端的时候,正常情况会接收到两个完整的报?。但也有可能接收到的是?个报?,它是由发送的两个报?组成的,这样对于应?程序来说就很难处理了(这样称为粘包);还有可能出现上?这样的虽然收到了两个包,但是??的内容却是互相包含,对于应?来说依然?法解析(拆包)。
拆包解决思路:
基本思路就是不断的从TCP缓冲区中读取数据,每次读取完都需要判断是否是?个完整的数据包。
若当前读取的数据不?以拼接成?个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到?个完整的数据包
若当前读到的数据加上已经读取的数据?够拼接成?个数据包,那就将已经读取的数据拼接上本次读取的数据,构成?个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接
关键点是如何判断是?个完整的数据包,解决策略:
(1)设置消息边界(分隔符,对应Netty提供的LineBasedFrameDecoder、DelimiterBasedFrameDecoder解码器),例如换行符或者自定义的分隔符。
(2)设置定?消息(对应Netty提供的FixedLengthFrameDecoder解码器),例如对Integer指定4个字节。
(3)使?带消息头的协议,消息头存储消息开始标识及消息的?度信息Header+Body(对应Netty提供的LengthFieldBasedFrameDecoder解码器),例如http形式的处理,使用报文头中的长度截取报文体。
(4)发送消息?度,?定义消息解码器
LineBasedFrameDecoder是回?换?解码器,如果?户发送的消息以回?换?符作为消息结束的标识,则可以直接使?Netty的LineBasedFrameDecoder对消息进?解码,只需要在初始化Netty服务端或者客户端时将LineBasedFrameDecoder正确的添加到ChannelPipeline中即可,不需要??重新实现?套换?解码器。
LineBasedFrameDecoder的?作原理是它依次遍历ByteBuf中的可读字节,判断是否有“\n”或“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了??。它是以换?符为结束标志的解码器,?持携带结束符或不携带结束符两种解码?式,同时?持配置单?的最??度。如果连接读取到最??度后仍然没有发现换?符,就会抛出异常,同时忽略掉之前读到的异常码流。防?由于数据报没有携带换?符导致接收到 ByteBuf ?限制积压,引起系统内存溢出。
通常LineBasedFrameDecoder会和StringDecoder搭配使?。StringDecoder的功能?常简单,就是将接收到的对象转换成字符串,然后继续调?后?的Handler。LineBasedFrameDecoder+StringDecoder组合就是按?切换的?本解码器,?来?持TCP的粘包和拆包。
对于?本类协议的解析,?本换?解码器?常实?,例如对 HTTP 消息头的解析、FTP 协议消息的解析等。
LineBasedFrameDecoder使?起来?分简单,只需要在ChannelPipeline 中添加即可,如下所示
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LineBasedFrameDecoder(1024)); p.addLast(new StringDecoder()); p.addLast(new StringEncoder()); p.addLast(new LineServerHandler()); } });
DelimiterBasedFrameDecoder是分隔符解码器,?户可以指定消息结束的分隔符,它可以?动完成以分隔符作为码流结束标识的消息的解码。回?换?解码器实际上是?种特殊的DelimiterBasedFrameDecoder解码器。
?先将分隔符转换成 ByteBuf 对象,作为参数构造 DelimiterBasedFrameDecoder,将其添加到ChannelPipeline 中,然后依次添加字符串解码器(通常?于?本解码)和?户 Handler。
DelimiterBasedFrameDecoder 原理分析:解码时,判断当前已经读取的 ByteBuf 中是否包含分隔符 ByteBuf,如果包含,则截取对应的 ByteBuf 返回。
示例:发消息时自定义使用&_作为分隔符,解码时也使用其作为分隔符。
// 客户端发送数据 public class ClientHandler extends ChannelInboundHandlerAdapter { /** * 当客户端连接服务器完成就会触发该?法 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String message = "aaaaaaaaaaaaaaaa&_bbbbbbbbbbbbbbbbbb&_ccccccccccc&_"; ByteBuf byteBuf = Unpooled.buffer(message.getBytes().length); byteBuf.writeBytes(message.getBytes()); ctx.writeAndFlush(byteBuf); } }
// 服务端添加分隔符解码器 public class EchoServer { public static void main(String[] args) { // 设置两个线程组 serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 向pipeline加?分隔符解码器 ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,true,delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ServerHandler()); } }); } }
FixedLengthFrameDecoder是固定?度解码器,它能够按照指定的?度对消息进??动解码,开发者不需要考虑TCP的粘包/拆包等问题,?常实?。对于定?消息,如果消息实际?度?于定?,则往往会进?补位操作,它在?定程度上导致了空间和资源的浪费。但是它的优点也是?常明显的,编解码?较简单。
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO))//配置?志输出 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new FixedLengthFrameDecoder(10)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } });
?多数的协议(私有或者公有),协议头中会携带?度字段,?于标识消息体或者整包消息的?度,例如SMPP、HTTP协议等。由于基于?度解码需求的通?性,以及为了降低?户的协议开发难度,Netty提供了LengthFieldBasedFrameDecoder,?动屏蔽TCP底层的拆包和粘包问题,只需要传?正确的参数,即可轻松解决“读半包“问题。
源码的构造函数:
public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { this(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true); }
参数解释:
* <pre> * lengthFieldOffset = 0 * lengthFieldLength = 2 * lengthAdjustment = 0 * <b>initialBytesToStrip</b> = <b>2</b> (= the length of the Length field) * BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes) * +--------+----------------+ +----------------+ * | Length | Actual Content |----->| Actual Content | * | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" | * +--------+----------------+ +----------------+ * </pre> lengthFieldOffset = 0,?度字段偏移位置为0表示从包的第?个字节开始读取; lengthFieldLength = 2,?度字段?为2,从包的开始位置往后2个字节的?度为?度字段; lengthAdjustment = 0 ,解析的时候?需跳过任何?度; initialBytesToStrip = 2,去掉当前数据包的开头2字节,去掉 header。 0x000C 转为 int = 12。
//协议包 public class MessageProtocol { private int len; //关键 private byte[] content; public int getLen() { return len; } public void setLen(int len) { this.len = len; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } }
服务端示例
public class MyProtocolServer { private int port; public MyProtocolServer(int port) { this.port = port; } public void start(){ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap server = new ServerBootstrap() .group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializer()); try { ChannelFuture future = server.bind(port).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { MyProtocolServer server = new MyProtocolServer(7788); server.start(); } }
客户端示例
public class MyProtocolClient { private int port; private String address; public MyProtocolClient(int port, String address) { this.port = port; this.address = address; } public void start(){ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ClientChannelInitializer()); try { ChannelFuture future = bootstrap.connect(address,port).sync(); future.channel().writeAndFlush("Hello world, i‘m online"); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally { group.shutdownGracefully(); } } public static void main(String[] args) { MyProtocolClient client = new MyProtocolClient(7788,"127.0.0.1"); client.start(); } }
原文:https://www.cnblogs.com/liconglong/p/15220296.html