服务端
package com.mm.chat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class NettyServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { // 设置解码器,用作入站 socketChannel.pipeline().addLast(new StringDecoder()); // 设置编码器,用作出站 socketChannel.pipeline().addLast(new StringEncoder()); // 设置处理器 socketChannel.pipeline().addLast(new NettyServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(9999); channelFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("连接成功"); } else { System.out.println("连接失败"); } } }); System.out.println("服务器启动成功!!!!!!!"); Channel channel = channelFuture.channel(); channel.closeFuture().sync(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
服务端处理器NettyServerHandler
package com.mm.chat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.util.ArrayList; import java.util.List; public class NettyServerHandler extends SimpleChannelInboundHandler<String> { static List<Channel> channelList = new ArrayList<Channel>(); /** * 监听连接就绪 * * @param ctx 通道上下文 * @throws Exception 异常 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("ip["+ channel.remoteAddress().toString().substring(1) +"]上线"); channelList.add(channel); } /** * 监听非连接就绪 * * @param ctx 通道上下文 * @throws Exception 异常 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("ip["+ channel.remoteAddress().toString().substring(1) +"]下线"); channelList.remove(channel); } /** * 监听异常 * * @param ctx 通道上下文 * @param cause 异常信息 * @throws Exception 异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel channel = ctx.channel(); System.out.println("ip["+ channel.remoteAddress().toString().substring(1) +"]异常"); channelList.remove(channel); cause.printStackTrace(); } /** * 接受读取消息 * * @param channelHandlerContext 通道上下文 * @param msg 入站消息 * @throws Exception 异常 */ protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { Channel channel = channelHandlerContext.channel(); String substring = channel.remoteAddress().toString().substring(1); System.out.println(substring + ":" + msg); for (Channel channel1 : channelList) { if (channel != channel1) { String aa = "ip["+ substring +"]说:" + msg; channel1.writeAndFlush(aa); } } } }
客户端
package com.mm.chat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; public class NettyClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 设置解码器,用作入站 socketChannel.pipeline().addLast(new StringDecoder()); // 设置编码器,用作出站 socketChannel.pipeline().addLast(new StringEncoder()); // 设置处理器 socketChannel.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync(); Channel channel = channelFuture.channel(); System.out.println("客户端连接成功"); System.out.println("============" + channelFuture.channel().localAddress().toString().substring(1) + "==============="); Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String msg = scanner.nextLine(); channel.writeAndFlush(msg); } channel.closeFuture().sync(); group.shutdownGracefully(); } }
客户端处理器NettyClientHandler
package com.mm.chat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class NettyClientHandler extends SimpleChannelInboundHandler<String> { /** * 接受消息 * * @param channelHandlerContext 通道上下文 * @param msg 消息 * @throws Exception 异常 */ protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(msg); } }
原文:https://www.cnblogs.com/pastjx/p/14716322.html