NioEventLoopGroup的创建
一般是通过创建NioEventLoopGroup来创建NioEventLoop。
bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup();
NioEventLoopGroup的构造函数:
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (ThreadFactory)null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(nThreads, threadFactory, new Object[]{selectorProvider});
}
可以从默认的构造函数看出来,默认传入的线程数是0,默认传入的ThreadFactory是null,并且传入了一个selectorProvider。最后层层调用了父类的构造函数。
//MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0?DEFAULT_EVENT_LOOP_THREADS:nThreads, threadFactory, args);
}
这里会判断传入的线程数是不是等于0,如果等于0,就将线程数设置为2*cpu。默认情况下不传参数,会创建2*cpu个线程数的线程池NioEventLoopGroup。然后继续调用父类构造函数
//MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
if(nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", new Object[]{Integer.valueOf(nThreads)}));
} else {
//创建ThreadFactory
if(threadFactory == null) {
threadFactory = this.newDefaultThreadFactory();
}
//创建child线程组
this.children = new SingleThreadEventExecutor[nThreads];
//创建线程选择器
if(isPowerOfTwo(this.children.length)) {
this.chooser = new MultithreadEventExecutorGroup.PowerOfTwoEventExecutorChooser(null);
} else {
this.chooser = new MultithreadEventExecutorGroup.GenericEventExecutorChooser(null);
}
//使用newChild方法初始化child线程组,这里要将threadFactory传入
int len$;
for(int terminationListener = 0; terminationListener < nThreads; ++terminationListener) {
boolean arr$ = false;
boolean var17 = false;
try {
var17 = true;
this.children[terminationListener] = this.newChild(threadFactory, args);
arr$ = true;
var17 = false;
} catch (Exception var18) {
throw new IllegalStateException("failed to create a child event loop", var18);
} finally {
if(var17) {
if(!arr$) {
int j;
for(j = 0; j < terminationListener; ++j) {
this.children[j].shutdownGracefully();
}
for(j = 0; j < terminationListener; ++j) {
EventExecutor e1 = this.children[j];
try {
while(!e1.isTerminated()) {
e1.awaitTermination(2147483647L, TimeUnit.SECONDS);
}
} catch (InterruptedException var19) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
if(!arr$) {
for(len$ = 0; len$ < terminationListener; ++len$) {
this.children[len$].shutdownGracefully();
}
for(len$ = 0; len$ < terminationListener; ++len$) {
EventExecutor i$ = this.children[len$];
try {
while(!i$.isTerminated()) {
i$.awaitTermination(2147483647L, TimeUnit.SECONDS);
}
} catch (InterruptedException var21) {
Thread.currentThread().interrupt();
break;
}
}
}
}
FutureListener var22 = new FutureListener() {
public void operationComplete(Future<Object> future) throws Exception {
if(MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
}
}
};
EventExecutor[] var23 = this.children;
len$ = var23.length;
for(int var24 = 0; var24 < len$; ++var24) {
EventExecutor e = var23[var24];
e.terminationFuture().addListener(var22);
}
}
}
这里的构造函数主要做了三件事情;
1.创建ThreadFactory线程构造器
2.使用newChild方法创建child线程组
3.创建线程选择器
创建threadFactory
这里的threadFactory相当于是线程池聚合的一个组件,它是通过newDefaultThreadFactory进行创建的。
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(this.getClass(), 10);
}
DefaultThreadFactory的构造函数:
public DefaultThreadFactory(Class<?> poolType, int priority) {
this((Class)poolType, false, priority);
}
public DefaultThreadFactory(String poolName, boolean daemon) {
this((String)poolName, daemon, 5);
}
public DefaultThreadFactory(Class<?> poolType, int priority) {
this((Class)poolType, false, priority);
}
public DefaultThreadFactory(String poolName, int priority) {
this((String)poolName, false, priority);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this((String)toPoolName(poolType), daemon, priority);
}
private static String toPoolName(Class<?> poolType) {
if(poolType == null) {
throw new NullPointerException("poolType");
} else {
String poolName = StringUtil.simpleClassName(poolType);
switch(poolName.length()) {
case 0:
return "unknown";
case 1:
return poolName.toLowerCase(Locale.US);
default:
return Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))?Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1):poolName;
}
}
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this.nextId = new AtomicInteger();
if(poolName == null) {
throw new NullPointerException("poolName");
} else if(priority >= 1 && priority <= 10) {
this.prefix = poolName + ‘-‘ + poolId.incrementAndGet() + ‘-‘;
this.daemon = daemon;
this.priority = priority;
} else {
throw new IllegalArgumentException("priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
}
public Thread newThread(Runnable r) {
Thread t = this.newThread(new DefaultThreadFactory.DefaultRunnableDecorator(r), this.prefix + this.nextId.incrementAndGet());
try {
if(t.isDaemon()) {
if(!this.daemon) {
t.setDaemon(false);
}
} else if(this.daemon) {
t.setDaemon(true);
}
if(t.getPriority() != this.priority) {
t.setPriority(this.priority);
}
} catch (Exception var4) {
;
}
return t;
}
这里传入了当前的类类型NioEventLoopGroup,设置当前线程池的名称为nioEventLoopGroup,线程的优先级为5,然后载调用父类构造函数当前线程池的前缀nioEventLoopGroup-线程池id-。
当每次进行newThread的时候,会将线程池前缀和当前的线程数传入,所以线程的名称就为nioEventLoopGroup-线程池id-线程数id。
例如:nioEventLoopGroup-2-1 代表的是第二个线程池的第一个线程。
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(r, name);
}
当调用newThread方法时,其实创建的是一个FastThreadLocalThread对象,netty底层的一个Thread对象,对Thread进行了包装,优化了相关的ThreadLocal操作。
通过newChild()方法创建NioEventLoop线程组
这里的newChild()方法就是创建一个NioEventLoop。
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider)args[0]);
}
NioEventLoop的构造,要传入父线程池,threadFactory,和provider。每一个NioEventLoop都有一个selector与它进行绑定,为了实现IO多路复用。
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if(selectorProvider == null) {
throw new NullPointerException("selectorProvider");
} else {
this.provider = selectorProvider;
this.selector = this.openSelector();
}
}
创建任务队列,如果不是NioEventLoop对应的线程,当是外部线程时,就将任务放入任务队列,否则使用线程执行任务。这里的任务队列是LinkedBlockingQueue
protected Queue<Runnable> newTaskQueue() {
return new LinkedBlockingQueue();
}
NioEventLoop的主要组件就是selector,taskqueue,thread,selector用于IO多路复用,使用selector才能使得一个线程能处理多个连接(channel的事件),而thread就是存储当前NioEventLoop的线程,如果不在当前线程中,那么就将任务提交到任务队列中taskqueue。
创建线程选择器
线程选择器是对应NioEventLoopGroup.next()方法,每当有一个新的连接进入的时候,NioEventLoopGroup要选择一个NioEventLoop和新连接进行绑定。

在创建线程选择器的时候会判断当前线程池的个数是不是2的幂次方,如果是就创建一个PowerOfTwoEventExecutorChooser,否则就创建一个GenericEventExecutorChooser。
if(isPowerOfTwo(this.children.length)) {
this.chooser = new MultithreadEventExecutorGroup.PowerOfTwoEventExecutorChooser(null);
} else {
this.chooser = new MultithreadEventExecutorGroup.GenericEventExecutorChooser(null);
}
普通的线程选择器:
private final class GenericEventExecutorChooser implements MultithreadEventExecutorGroup.EventExecutorChooser {
private GenericEventExecutorChooser() {
}
public EventExecutor next() {
return MultithreadEventExecutorGroup.this.children[Math.abs(MultithreadEventExecutorGroup.this.childIndex.getAndIncrement() % MultithreadEventExecutorGroup.this.children.length)];
}
}
二的幂次方线程选择器:
private final class PowerOfTwoEventExecutorChooser implements MultithreadEventExecutorGroup.EventExecutorChooser {
private PowerOfTwoEventExecutorChooser() {
}
public EventExecutor next() {
return MultithreadEventExecutorGroup.this.children[MultithreadEventExecutorGroup.this.childIndex.getAndIncrement() & MultithreadEventExecutorGroup.this.children.length - 1];
}
}
但满足线程数是2的幂次方时,采用&操作来优化,&比%要高效很多。
原文:https://www.cnblogs.com/xiaobaituyun/p/10799786.html