首页 > Web开发 > 详细

netty服务端的创建

时间:2019-11-12 00:23:39      阅读:91      评论:0      收藏:0      [点我收藏+]

就以netty-example中的EchoServer这个经典例子作为楔子吧

    // 创建bossGroup和workerGroup,reactor模式的实现
    EventLoopGroup bossGroup = new NioEventLoopGroup(1); 
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    final EchoServerHandler serverHandler = new EchoServerHandler();
    try {
        ServerBootstrap b = new ServerBootstrap();
        // 在启动引导类ServerBootstrap中配置各种serverSocketChannel和socketChannel属性
        b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
                .attr(AttributeKey.newInstance("attr"), "attr")
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new channelInitializer<SocketChannel> {
                @Override
                public void initChannel(SocketChannel ch)throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    if (sslCtx != null) {
                        p.addLast(sslCtx.newHandler(ch.alloc()));
                    }
                    //p.addLast(new LoggingHandler(LogLevel.INFO));
                    p.addLast(serverHandler);
                }
            })
            .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
            .childAttr(AttributeKey.newInstance("childAttr"), "childAttr");

            // bind方法开始启动服务端
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();`    

首先看下EventLoopGroup的构造

    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    public NioEventLoopGroup(int nThreads, Executor executor){
        this(nThreads, executor, SelectorProvider.provider());
    }

    public NioEventLoopGroup(int nThreads, Executor executor,                 final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

在越过一层层的重载方法时,我们注意到bossGroup设置了线程数、生产线程的工厂类型、selectorProvider和selector策略工厂,线程池拒绝策略等。尤其要注意线程数,当调用默认构造函数时,会传入DEFAULT_EVENT_LOOP_THREADS,它的默认值是2倍CPU可运行核心数。
之后来到真正的NioeventLoopGroup的构造函数

protected MultithreadEventExecutorGroup(int nThreads,               Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
        // 1:创建一个线程池, 线程池工厂主要设置了线程池内线程的前缀名
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            // 2:将创建eventLoop,并和线程池内线程一一对应
                children[i] = newChild(executor, args);
                success = true;
            }
        }

        // 3:设置NioEventLoop的轮训方式
        chooser = chooserFactory.newChooser(children);

        // 4:为每个nviEventLoop添加一个操作完成的监听器
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

我们进入第二个标注即创建nioEventLoop的地方,并着重看下openSelector()方法

    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            // 未包装的selector,即jdkNio的selector
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        // 不开启selectionKeySet优化,则直接返回包装过的selector
        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        // 优化过后的SelectionKeySet,底层是用array代替了set,并通过反射的方式对Selector来了一招偷梁换柱
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException | IllegalAccessException e) {
                    return e;
                }
            }
        });

        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }

group至此就创建完毕, bossGroup和workGroup除了线程数不一样外,其他地方均一致。

之后进入ServerBootstrap的配置环节

    // 在group方法中设置bossGroup为parentGroup,workGroup为childGroupo
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        ObjectUtil.checkNotNull(childGroup, "childGroup");
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

    //channel方法中设置了一个工厂,该工厂生产的对象是设置进来的泛型,结合工厂的名字(反射)和属性(构造函数),可以猜测该工厂是利用反射+泛型+工厂模式来生产设置进来的serverSocketChannel
    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }

除此之外还设置了handler、childHandler、option、childOption、attr、childAttr6个属性,根据group的命名规则,可以猜测不带child的属性是给bossGroup内的nioEventLoop使用,而以child开头的属性是给workGroupo内的nioEventLoop使用的

serverBootstrap配置完后,开始服务端真正的启动工作,进入b.bind()方法,一路跳转到AbstractBootstrap.doBind(SocketAddress localAddress)方法。

        private ChannelFuture doBind(final SocketAddress localAddress) {
        // 初始化并注册
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        //若已经注册完成,则开始绑定,否则添加一个监听器,待注册完成后绑定
        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener((ChannelFutureListener) future ->
                    doBind0(regFuture, channel, localAddress, promise));
            return promise;
        }
    }

先看看initAndRegister()方法,首先是创建NioServerSocketChannel,通过工厂模式实例化一个channel。

final ChannelFuture initAndRegister() {
        Channel channel  = channelFactory.newChannel();
        init(channel);

        ChannelFuture regFuture = config().group().register(channel);

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.
        // 上面的注释大意是,走到这里promise没有失败,要么就是由当前eventLoop注册完成,要么就是丢到任务队列里由其他eventLoop
        //注册完成了,所以现在执行绑定和链接是安全的
        return regFuture;
    }

    // 与我们猜测一致,通过反射获得的构造函数实例化serverSocketChannel。
    @Override
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    // 注意此处传入了OP_ACCEPT感兴趣事件
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

    // 调用父类构造函数,在构造函数里设置好channel,selectProvidor创建的serverSocketChannel及OP_ACCEPT事件
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                logger.warn(
                            "Failed to close a partially initialized socket.", e2);
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

    // 在顶级抽象父类里,创建好id、unsafe、pipeline。netty的unsafe是关联socket或其他可进行IO操作组件的一个类,与jdk的unsafe对象类似,一般不
    // 需要用户关注。
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

之后开始执行初始化,将bootstrapServer中的option和attr赋予serverSocketChannel,通过pipeline添加一个channelInitializer,进而通过channelInitializer将childOption、childAttr、workerGroup、childHandler保存在一个叫ServerBootstrapAcceptor的handler中。从名字可以猜测,此handler应该是在客户端连接时来处理相关事件的。而channelInitializer会在完成由子类实现的initChannel方法后将自己从pipeline中移除。

        void init(Channel channel) {
        setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
        setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions =
                childOptions.entrySet().toArray(newOptionArray(0));
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(() -> pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)));
            }
        });
    }

初始化完毕后便是注册,稍显麻烦

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

    @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            AbstractChannel.this.eventLoop = eventLoop;

            // 如果当前线程是之前nioEventLoop绑定的线程则直接注册,否则添加到eventLoop的线程池中
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(() -> register0(promise));
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

    // 通过nioEventLoop线程池的线程执行任务
        @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

    // 判断当前线程是否是NioEventLoop线程
    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }

    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }

        @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 调用jdk的serverSocketChannel注册到selector上,注意这里注册的感兴趣事件为0,表示对所有事件都不感兴趣
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

首先判断是否注册以及传进来的eventLoop参数是否属于NioEventLoop类,然后判断当前线程是否是NioEventLoop线程,若是,则直接注册,否则添加到eventLoop的线程池中。在eventLoop的execute方法中,将任务添加到任务队列,再次判断当前线程是否是NioEventLoop线程,若不是,启动一个新线程来执行。显然新启动的线程属于nioEventLoop线程。
在register0——doRegister方法链中,我们看到netty最终调用了jdk底层的channel绑定了selector,由于此时还未绑定端口,所以ops即感兴趣事件是0。同时,把this,即NioServerSocketChannel作为attachment添加到selectionKey上,这是为了之后在select出事件时,可以获取channel进行操作。注册完成。

回到AbstractBootStrap, 开始执行doBind0方法。这也是需要由nioEventLoop执行的,所以也丢到了线程池里。

        private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

    
    @Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }

        // 调用下一个OutBoundHandlerContext执行,默认传递到headContext
        final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

    // 传到headContext,调用unsafe来执行。
    @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            unsafe.bind(localAddress, promise);
        }
    
    // AbstractUnsafe的bind
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    try {
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }    

    // unsafe最终调用jdk的channel完成绑定操作
        @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

nioEventLoop通过pipeline的方式绑定,pipeline调用tailContext的bind方法,tailContext又会不断寻找下一个OutBoundHandler来执行bind方法,默认会传到headContext,headContext再调用底层的unsafe来执行bind。unsafe完成bind后,会通知pipeline调用fireChannelActive方法。

fireChannelActive首先会传递到headContext,

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }

headContext先将active事件传播,然后调用readIfIsAutoRead方法。此方法不断传播,直到headContext,headContext又调用unsafe的beginRead

        @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

在beginRead方法中,将channel注册感兴趣事件为创建NioServerSocketChannel时传入的OP_ACCEPT事件。

至此,创建、初始化、注册、绑定都完成了,随后主线程调用sync——>await,阻塞在此处,任由netty自由发挥。

总结
1.服务端启动有4个流程:创建、初始化、注册、绑定
2.netty通过反射,将selectKey由set转为了array
3.默认情况下,bossGroup有一个线程,workerGroup有2倍CPU核心数线程

netty服务端的创建

原文:https://www.cnblogs.com/spiritsx/p/11839066.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!