public static void main(String[] args) { String host = "127.0.0.1"; int port = 8886; EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new MyClientInitializer()); // 启动客户端. ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyClientHandler()); } } public class MyClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(ctx.channel().remoteAddress()); System.out.println("msg:"+msg); ctx.writeAndFlush("from client:" + LocalDateTime.now()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush("from client:" + "我主动发第一条数据"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
public class MyServer { int port ; public MyServer(int port){ this.port = port; } public void start() throws Exception{ EventLoopGroup boss = new NioEventLoopGroup(); // selector EventLoopGroup work = new NioEventLoopGroup(); // channel try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss,work) .handler(new LoggingHandler(LogLevel.DEBUG)) .channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer()); ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync(); System.out.println(" server start up on port : " + port); f.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } public static void main(String[] args) throws Exception{ MyServer server = new MyServer(8899);// 8081为启动端口 server.start(); } } public class MyServerInitializer extends ChannelInitializer<SocketChannel> { // 客户端一旦和服务端连接,这个方法就会被调用 @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyRequestHandler()); } } public class MyRequestHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String req) throws Exception { System.out.println(ctx.channel().remoteAddress()); System.out.println("req msg:"+req); ctx.writeAndFlush("from server:" + LocalDateTime.now()); } }
原文:https://www.cnblogs.com/shineipangtuo/p/13763507.html