上一篇博客:netty 使用 protobuf 序列化,本篇将自定义 编码解码器,对数据传输过程进行“入站解码,出站编码”。
编码器 MyLongToByteEncoder
package com.oy.inboundandoutbound; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MyLongToByteEncoder extends MessageToByteEncoder<Long> { @Override protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception { System.out.println("MyLongToByteEncoder encoder 被调用. msg: " + msg); out.writeLong(msg); } }
解码器 MyByteToLongDecoder
package com.oy.inboundandoutbound; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class MyByteToLongDecoder extends ByteToMessageDecoder { /** * decode() 会根据接收的数据,被调用多次,知道确定没有新的元素添加到list, * 或者是 ByteBuf 没有更多的可读字节为止。 * 如果 list 不为空,就会将 list 的内容传递给下一个 handler * @param ctx 上下文对象 * @param in 入站后的 ByteBuf * @param out 将解码后的数据传递给下一个 handler * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // long 类型 为 8 字节 if (in.readableBytes() >= 8) { out.add(in.readLong()); } } }
Server
package com.oy.inboundandoutbound.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(boss, work) .channel(NioServerSocketChannel.class) .childHandler(new MyServerChannelInitializer()); ChannelFuture future = serverBootstrap.bind(8004).sync(); System.out.println("server started and listen " + 8004); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } } package com.oy.inboundandoutbound.server; import com.oy.inboundandoutbound.MyByteToLongDecoder; import com.oy.inboundandoutbound.MyLongToByteEncoder; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel socketChannel) throws Exception { /* 向管道加入处理器 */ ChannelPipeline pipeline = socketChannel.pipeline(); // 入站的 handler 进行解码 pipeline.addLast("decoder", new MyByteToLongDecoder()); // 添加一个出站的 handler 对数据进行编码 pipeline.addLast("encoder", new MyLongToByteEncoder()); // 添加自定义的处理器 pipeline.addLast("MyServerHandler", new MyServerHandler()); } } package com.oy.inboundandoutbound.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class MyServerHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("从客户端读到的数据:" + msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 服务器返回 long 类型数据 ctx.writeAndFlush(654321L); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
Client
package com.oy.inboundandoutbound.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .handler(new MyClientChannelInitializer()); ChannelFuture future = bootstrap.connect("127.0.0.1", 8004).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } package com.oy.inboundandoutbound.client; import com.oy.inboundandoutbound.MyByteToLongDecoder; import com.oy.inboundandoutbound.MyLongToByteEncoder; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.nio.NioSocketChannel; public class MyClientChannelInitializer extends ChannelInitializer<NioSocketChannel> { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 入站的 handler 进行解码 pipeline.addLast("decoder", new MyByteToLongDecoder()); // 添加一个出站的 handler 对数据进行编码 pipeline.addLast("encoder", new MyLongToByteEncoder()); // 添加自定义 handler,处理业务逻辑 pipeline.addLast(new MyClientHandler()); } } package com.oy.inboundandoutbound.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class MyClientHandler extends SimpleChannelInboundHandler<Long> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Long msg) throws Exception { // 客户端读取服务器发送的 long 类型数据 System.out.println("客户端读取服务器发送的, msg:" + msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 客户端发送 long 类型数据 ctx.writeAndFlush(123456L); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
启动服务器和客户端程序, 控制台打印结果:
---
原文:https://www.cnblogs.com/xy-ouyang/p/12827228.html