什么是TCP拆包、粘包?
在网络通信中,数据在底层都是以字节流形式在流动,那么发送方和接受方理应有一个约定(协议),只有这样接受方才知道需要接受多少数据,哪些数据需要在一起处理;如果没有这个约定,就会出现本应该一起处理的数据,被TCP划分为多个包发给接收方进行处理,如下图:
看一个TCP拆包、粘包的实例
客户端Handler:
服务端Handler:
运行结果:
上面的程序本意是CLIENT发送3次消息给SERVER,SERVER端理应处理3次,可是结果SERVER却将3条消息一次处理了。
那么如何解决TCP拆包、粘包问题呢?其实思路不外乎有3种:
第一种:发定长数据
接收方拿固定长度的数据,发送方发送固定长度的数据即可。但是这样的缺点也是显而易见的:如果发送方的数据长度不足,需要补位,浪费空间。
第二种:在包尾部增加特殊字符进行分割
发送方发送数据时,增加特殊字符;在接收方以特殊字符为准进行分割
第三种:自定义协议
类似于HTTP协议中的HEAD信息,比如我们也可以在HEAD中,告诉接收方数据的元信息(数据类型、数据长度等)
Netty如何解决TCP拆包、粘包问题?
在《Java通信实战:编写自定义通信协议实现FTP服务》中,涉及到了JAVA SOCKET这方面的处理,大家可以参考。接下来,我们来看Netty这个框架是如何帮助我们解决这个问题的。本篇博客的代码在《Netty实践(一):轻松入门》基础上进行。
方式一:定长消息
Server启动类:
Client Handler:
运行结果:
利用FixedLengthFrameDecoder,加入到管道流处理中,长度够了接收方才能收到。
方式二:自定义分隔符
Server启动类:
Client Handler:
运行结果:
方式三:自定义协议
下面我们将简单实现一个自定义协议:
HEAD信息中包含:数据长度、数据版本
数据内容
MyHead
public class MyHead {
//数据长度
private int length;
//数据版本
private int version;
public MyHead(int length, int version) {
this.length = length;
this.version = version;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
}MyMessage
public class MyMessage {
//消息head
private MyHead head;
//消息body
private String content;
public MyMessage(MyHead head, String content) {
this.head = head;
this.content = content;
}
public MyHead getHead() {
return head;
}
public void setHead(MyHead head) {
this.head = head;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return String.format("[length=%d,version=%d,content=%s]",head.getLength(),head.getVersion(),content);
}
}编码器
/**
* Created by Administrator on 17-1-9.
* 编码器 将自定义消息转化成ByteBuff
*/
public class MyEncoder extends MessageToByteEncoder<MyMessage> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MyMessage myMessage, ByteBuf byteBuf) throws Exception {
int length = myMessage.getHead().getLength();
int version = myMessage.getHead().getVersion();
String content = myMessage.getContent();
byteBuf.writeInt(length);
byteBuf.writeInt(version);
byteBuf.writeBytes(content.getBytes(Charset.forName("UTF-8")));
}
}解码器
/**
* Created by Administrator on 17-1-9.
* 解码器 将ByteBuf数据转化成自定义消息
*/
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int length = byteBuf.readInt();
int version = byteBuf.readInt();
byte[] body = new byte[length];
byteBuf.readBytes(body);
String content = new String(body, Charset.forName("UTF-8"));
MyMessage myMessage = new MyMessage(new MyHead(length,version),content);
list.add(myMessage);
}
}Server启动类
public class Main {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup(); // (2)
int port = 8867;
try {
ServerBootstrap b = new ServerBootstrap(); // (3)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (4)
.childHandler(new ChannelInitializer<SocketChannel>() { // (5)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyEncoder())
.addLast(new MyDecoder())
.addLast(new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (6)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (7)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (8)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
System.out.println("start server....");
f.channel().closeFuture().sync();
System.out.println("stop server....");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("exit server....");
}
}
}Server Handler
public class ServerHandler extends ChannelHandlerAdapter {
//每当从客户端收到新的数据时,这个方法会在收到消息时被调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MyMessage in = (MyMessage) msg;
try {
// Do something with msg
System.out.println("server get :" + in);
} finally {
//ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放
//or ((ByteBuf)msg).release();
ReferenceCountUtil.release(msg);
}
}
//exceptionCaught()事件处理方法是当出现Throwable对象才会被调用
//当Netty由于IO错误或者处理器在处理事件时抛出的异常时
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}Client启动类
public class Client {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new MyDecoder());
p.addLast(new MyEncoder());
p.addLast(new ClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect("127.0.0.1", 8867).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}Client Handler
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(new MyMessage(new MyHead("abcd".getBytes("UTF-8").length,1),"abcd"));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
try {
// Do something with msg
System.out.println("client get :" + in.toString(CharsetUtil.UTF_8));
ctx.close();
} finally {
//ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放
//or ((ByteBuf)msg).release();
ReferenceCountUtil.release(msg);
}
}
}运行结果
到这里,你会发现Netty处理TCP拆包、粘包问题很简单,通过编解码技术支持,让我们编写自定义协议也很方便,在后续的Netty博客中,我将继续为大家介绍Netty在实际中的一些应用(比如实现心跳检测),See You~
本文出自 “学海无涯 心境无限” 博客,请务必保留此出处http://zhangfengzhe.blog.51cto.com/8855103/1890577
原文:http://zhangfengzhe.blog.51cto.com/8855103/1890577