就以netty-example中的EchoServer这个经典例子作为楔子吧
// 创建bossGroup和workerGroup,reactor模式的实现
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
// 在启动引导类ServerBootstrap中配置各种serverSocketChannel和socketChannel属性
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.attr(AttributeKey.newInstance("attr"), "attr")
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new channelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch)throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
})
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttr");
// bind方法开始启动服务端
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();`
首先看下EventLoopGroup的构造
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor){
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
在越过一层层的重载方法时,我们注意到bossGroup设置了线程数、生产线程的工厂类型、selectorProvider和selector策略工厂,线程池拒绝策略等。尤其要注意线程数,当调用默认构造函数时,会传入DEFAULT_EVENT_LOOP_THREADS,它的默认值是2倍CPU可运行核心数。
之后来到真正的NioeventLoopGroup的构造函数
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
// 1:创建一个线程池, 线程池工厂主要设置了线程池内线程的前缀名
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
// 2:将创建eventLoop,并和线程池内线程一一对应
children[i] = newChild(executor, args);
success = true;
}
}
// 3:设置NioEventLoop的轮训方式
chooser = chooserFactory.newChooser(children);
// 4:为每个nviEventLoop添加一个操作完成的监听器
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
我们进入第二个标注即创建nioEventLoop的地方,并着重看下openSelector()方法
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 未包装的selector,即jdkNio的selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 不开启selectionKeySet优化,则直接返回包装过的selector
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
// 优化过后的SelectionKeySet,底层是用array代替了set,并通过反射的方式对Selector来了一招偷梁换柱
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
ReflectionUtil.trySetAccessible(selectedKeysField, true);
ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException | IllegalAccessException e) {
return e;
}
}
});
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
group至此就创建完毕, bossGroup和workGroup除了线程数不一样外,其他地方均一致。
之后进入ServerBootstrap的配置环节
// 在group方法中设置bossGroup为parentGroup,workGroup为childGroupo
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
ObjectUtil.checkNotNull(childGroup, "childGroup");
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
//channel方法中设置了一个工厂,该工厂生产的对象是设置进来的泛型,结合工厂的名字(反射)和属性(构造函数),可以猜测该工厂是利用反射+泛型+工厂模式来生产设置进来的serverSocketChannel
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
除此之外还设置了handler、childHandler、option、childOption、attr、childAttr6个属性,根据group的命名规则,可以猜测不带child的属性是给bossGroup内的nioEventLoop使用,而以child开头的属性是给workGroupo内的nioEventLoop使用的
serverBootstrap配置完后,开始服务端真正的启动工作,进入b.bind()方法,一路跳转到AbstractBootstrap.doBind(SocketAddress localAddress)方法。
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
//若已经注册完成,则开始绑定,否则添加一个监听器,待注册完成后绑定
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener((ChannelFutureListener) future ->
doBind0(regFuture, channel, localAddress, promise));
return promise;
}
}
先看看initAndRegister()方法,首先是创建NioServerSocketChannel,通过工厂模式实例化一个channel。
final ChannelFuture initAndRegister() {
Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
// 上面的注释大意是,走到这里promise没有失败,要么就是由当前eventLoop注册完成,要么就是丢到任务队列里由其他eventLoop
//注册完成了,所以现在执行绑定和链接是安全的
return regFuture;
}
// 与我们猜测一致,通过反射获得的构造函数实例化serverSocketChannel。
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// 注意此处传入了OP_ACCEPT感兴趣事件
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
// 调用父类构造函数,在构造函数里设置好channel,selectProvidor创建的serverSocketChannel及OP_ACCEPT事件
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
// 在顶级抽象父类里,创建好id、unsafe、pipeline。netty的unsafe是关联socket或其他可进行IO操作组件的一个类,与jdk的unsafe对象类似,一般不
// 需要用户关注。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
之后开始执行初始化,将bootstrapServer中的option和attr赋予serverSocketChannel,通过pipeline添加一个channelInitializer,进而通过channelInitializer将childOption、childAttr、workerGroup、childHandler保存在一个叫ServerBootstrapAcceptor的handler中。从名字可以猜测,此handler应该是在客户端连接时来处理相关事件的。而channelInitializer会在完成由子类实现的initChannel方法后将自己从pipeline中移除。
void init(Channel channel) {
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(() -> pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)));
}
});
}
初始化完毕后便是注册,稍显麻烦
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
// 如果当前线程是之前nioEventLoop绑定的线程则直接注册,否则添加到eventLoop的线程池中
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(() -> register0(promise));
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
// 通过nioEventLoop线程池的线程执行任务
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
// 判断当前线程是否是NioEventLoop线程
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 调用jdk的serverSocketChannel注册到selector上,注意这里注册的感兴趣事件为0,表示对所有事件都不感兴趣
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
首先判断是否注册以及传进来的eventLoop参数是否属于NioEventLoop类,然后判断当前线程是否是NioEventLoop线程,若是,则直接注册,否则添加到eventLoop的线程池中。在eventLoop的execute方法中,将任务添加到任务队列,再次判断当前线程是否是NioEventLoop线程,若不是,启动一个新线程来执行。显然新启动的线程属于nioEventLoop线程。
在register0——doRegister方法链中,我们看到netty最终调用了jdk底层的channel绑定了selector,由于此时还未绑定端口,所以ops即感兴趣事件是0。同时,把this,即NioServerSocketChannel作为attachment添加到selectionKey上,这是为了之后在select出事件时,可以获取channel进行操作。注册完成。
回到AbstractBootStrap, 开始执行doBind0方法。这也是需要由nioEventLoop执行的,所以也丢到了线程池里。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
// 调用下一个OutBoundHandlerContext执行,默认传递到headContext
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
// 传到headContext,调用unsafe来执行。
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
// AbstractUnsafe的bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
// unsafe最终调用jdk的channel完成绑定操作
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
nioEventLoop通过pipeline的方式绑定,pipeline调用tailContext的bind方法,tailContext又会不断寻找下一个OutBoundHandler来执行bind方法,默认会传到headContext,headContext再调用底层的unsafe来执行bind。unsafe完成bind后,会通知pipeline调用fireChannelActive方法。
fireChannelActive首先会传递到headContext,
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
headContext先将active事件传播,然后调用readIfIsAutoRead方法。此方法不断传播,直到headContext,headContext又调用unsafe的beginRead
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
在beginRead方法中,将channel注册感兴趣事件为创建NioServerSocketChannel时传入的OP_ACCEPT事件。
至此,创建、初始化、注册、绑定都完成了,随后主线程调用sync——>await,阻塞在此处,任由netty自由发挥。
总结:
1.服务端启动有4个流程:创建、初始化、注册、绑定
2.netty通过反射,将selectKey由set转为了array
3.默认情况下,bossGroup有一个线程,workerGroup有2倍CPU核心数线程
原文:https://www.cnblogs.com/spiritsx/p/11839066.html