服务端代码可以是这样的
//1.定义server启动类 ServerBootstrap serverBootstrap = new ServerBootstrap(); //2.定义工作组:boss分发请求给各个worker:boss负责监听端口请求,worker负责处理请求(读写) EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); //3.定义工作组 serverBootstrap.group(boss,worker); //4.设置通道channel serverBootstrap.channel(NioServerSocketChannel.class);//A //serverBootstrap.channelFactory(new ReflectiveChannelFactory(NioServerSocketChannel.class));//旧版本的写法,但是此过程在A中有同样过程 //5.添加handler,管道中的处理器,通过ChannelInitializer来构造 serverBootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { //此方法每次客户端连接都会调用,是为通道初始化的方法 //获得通道channel中的管道链(执行链、handler链) ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new StringDecoder()); // pipeline.addLast("serverHandler1",new ServerHandler()); // pipeline.addLast("serverHandler2",new ServerHandler2()); pipeline.addLast(new StringEncoder()); System.out.println("success to initHandler!"); } }); //6.设置参数 //设置参数,TCP参数 serverBootstrap.option(ChannelOption.SO_BACKLOG, 2048); //连接缓冲池的大小 serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//维持链接的活跃,清除死链接 serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);//关闭延迟发送 //7.绑定ip和port try { ChannelFuture channelFuture = serverBootstrap.bind("0.0.0.0", 9099).sync();//Future模式的channel对象 //7.5.监听关闭 channelFuture.channel().closeFuture().sync(); //等待服务关闭,关闭后应该释放资源 } catch (InterruptedException e) { System.out.println("server start got exception!"); e.printStackTrace(); }finally { //8.优雅的关闭资源 boss.shutdownGracefully(); worker.shutdownGracefully(); }
启动类是ServerBootstrap,有两个属性:group和childGroup,对应上面代码里的boss和worker,是两个NioEventLoopGroup的实例,关键方法是上面try里的bind,点进去看,
1、initAndRegister方法,用channelFactory新建一个channel,这个channel是上面代码中channel()方法的参数NioServerSocketChannel的实例。
然后调用group的register方法,具体实现在NioEventLoopGroup的父类MultithreadEventLoopGroup中,调用的是next().register,这个next是按轮询的方式取出group的child数组的元素,数组中存
的是NioEventLoop,具体的初始化是在NioEventLoopGroup的构造方法中,而且每一个NioEventLoop对应一个selector。register的具体实现在NioEventLoop的父类SingleThreadEventLoop中,
调用的channel的unsafe属性的register,而NioServerSocketChannel的unsafe是其父类AbstractNioMessageChannel的newSafe方法的返回值NioMessageUnsafe,其register方法在父类
AbstractUnsafe中,点进去register0方法中的doRegister方法,具体实现是在AbstractNioChannel中,这个是内部类对外部类的引用,发现有这一句:
javaChannel().register(eventLoop().unwrappedSelector(), 0, this),netty的NioServerSocketChannel其实是对Java nio的ServerSocketChannel的封装,javaChannel.register就是
取出ServerSocketChannel,调用其register方法,eventLoop().unwrappedSelector()这个就是loopGroup的loop数组中,loop--线程--selector是一对一的关系,而这几个“一”和channel是一对多
或者一对一关系,可以发现netty底层其实还是Java的nio。
2、再往下看,doBind0方法,eventloop.execute方法中有个channel.bind,调用的是pipeline.bind,pipeline是channel的属性,掌管这一个双向链表head到tail,中间可以放channel自己的handler,
这是个责任链模式,channel现在的bind以及以后读取到数据都是从这个链表的正向或者逆向流动处理一遍。
3、eventloop.execute方法会保证对应的线程启动,线程的run方法会同时执行task,以及进行对应的selector的select,像nio那样,遍历所有key,检测可读,可写,有链接到来等情况,
具体方法在processSelectedKeys。由于现在我们讨论的是NioServerSocketChannel,所以看OP_ACCEPT也就是有连接到来的情况,调用的是unsafe.read()。上面说过这是个NioMessageUnsafe,
点进去看其read方法,核心在doReadMessages(readBuf),具体实现由回到NioServerSocketChannel,发现直接调用熟悉的SocketChannel ch = SocketUtils.accept(javaChannel()),也就是
Java NIO的ServerSocketChannel的accept方法,得到socketChannel,并放在readBuf中,在下面的pipeline.fireChannelRead方法中再进行遍历,上面说过,这个是在handler链中流动,
按照Java NIO的流程,这里应该是让新得到的socketChannel注册到某个selector上,以后selector就遍历这些channel,进行实际数据的读取。那么现在pipeline中除了tail和head这两个空handler,
具体的处理在一个ServerBootstrap的内部类ServerBootstrapAcceptor中,这个ServerBootstrapAcceptor之所以能存在于pipeline中,是因为刚才说的initAndRegister方法中,channel生成之后
有一个init(channel)方法,往pipeline中添加了一个ChannelInitializer,这也是一个ChannelHandler。而在上面说的doRegister方法之后,有一个pipeline.fireChannelRegistered(),把管道中的
所有handler都调用channelRegistered方法,而ChannelInitializer的channelRegistered调用了initChannel方法, 这样就把ServerBootstrapAcceptor放入NioServerSocketChannel
的管道中,点击去看其channelRead方法,有childGroup.register(child),这个和之前的group.register道理一样,调用的是NioSocketChannel的unsafe:NioByteUnsafe的父类,同样是
AbstractUnsafe的register方法,也是从childLoopGroup这个子线程组中取出一个loop,把NioSocketChannel注册到对应的selector上,线程的run方法for循环中读取请求数据,在其pipeline中
执行channelRead方法,我们只需要把我们自己的handler放入pipelin就可以进行我们自己的逻辑。
到这里,数据流动大体上是通了。
4、具体的解码编码,粘包的处理,对内存比如buffer的管理,待完善。
原文:https://www.cnblogs.com/chuliang/p/11831610.html