周末简单看了下netty5的源码,只看懂了个大概,记录下成果,方便下次再看的时候回忆.
上服务端代码:
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEchoServerHandler(),
new MyServerHandler());
}
});
// Bind and start to accept incoming connections.
b.bind(port).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}EventLoopGroup类有4个相关类容易搞混:
EventExecutor EventExecutorGroup, EventLoop, EventLoopGroup,
EventExecutor 继承自EventExecutorGroup , EventExecutorGroup 继承自ScheduledExecutorService(java自带的线程池服务类), EventExecutor 是一个特殊的EventExecutorGroup,提供了检测一个线程是否在eventLoop中被执行之类的方法.
EventLoopGroup 也继承自EventExecutorGroup, 并提供EventLoop的生成方法next(),
EventLoop继承自EventLoopGroup, EventLoop的英文解释是:Will handle all the I/O-Operations for a Channel once it was registered,即处理channel的io操作,
ServerBootstrap 类主要用于创建NioServerSocketChannel类,并初始化. 主要看它的 bind()方法,如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
return promise;
}创建并注册方法如下:
final ChannelFuture initAndRegister() {
Channel channel;
try {
channel = createChannel();
} catch (Throwable t) {
return VoidChannel.INSTANCE.newFailedFuture(t);
}
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}
ChannelPromise regFuture = channel.newPromise();
channel.<span style="color:#ff0000;">unsafe().register(regFuture);</span>
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
} protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}通过分析channel的类继承关系会发现, channel部分抽象类内部同时也包含相应unsafe类的实现, 下图是NioServerSocketChannel类的继承结构.
AbstractNioMessageChannel 到 AbstractChannel 类内部都有 unsafe内部类作为实际操作IO类.
原文:http://blog.csdn.net/zhuyijian135757/article/details/38184613