在前面的文章:<<线程池原理初探>>中我们学习了线程池的基本用法和优点,并且从源码层面学习了线程池的内部数据结构以及运行状态表征方法,这是最基础但是又很重要的一环,有了这一步铺垫我们便可以开始进一步的源码学习之旅了。
本文会从如下几个方面展开:
上文说到,线程池中的工作线程是保存在一个hashSet中,这样说其实并不是很准确,因为线程池中执行任务的基本单元是一个定义在ThreadPoolExecutor中的内部类Worker,继承自AQS,并实现了Runnable接口,其本身就是一个任务,内部封装了驱动其运行的线程,而这个worker才是保存在hashSet中的。我们来看一下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** 这才是真的线程,worker只是一个runnable,需要线程来驱动,而这个线程则是封装在worker中,worker在其自己的run()方法中再去执行队列中的任务 */ final Thread thread; /** 第一个要执行的任务 */ Runnable firstTask; /** 已完成的任务数量 */ volatile long completedTasks;
// 构造函数 Worker(Runnable firstTask) { setState(-1); // 防止在runWorker之前被中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 委托给ThreadPoolExecutor中的runWorker方法 */ public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
这里其实只要重点关注Worker自身是一个任务,它将线程封装起来了,由该线程来驱动Worker的run()方法,然后Worker在其自己的run()方法中不断地从任务队列中获取任务并执行。
关于Worker,我们先了解这么多就够了,一些更深入的细节还需要结合ThreadPoolExecutor自身的逻辑来理解才更容易弄清楚。
其实ThreadPoolExecutor的execut()方法是一个很好的看源码的入口,因为这也许是我们使用的最多的方法,并且线程池的主要逻辑也在这个方法中。该方法对于用户来说就是向线程池提交任务,至于提交任务之后的逻辑,是否要新建线程、是否将任务加入阻塞队列中、是否要拒绝任务等等,这些对用户都是透明的,这也是我们接下来要重点探索的:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 检查工作线程的数量,低于corePoolsize则添加Worker int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // isRunning()用来检查线程池是否处于运行状态 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次进行防御性检查 if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 到这里已经意味着已经饱和或者被shutdown了,尝试添加一个非核心worker,如果失败就就直接执行拒绝 else if (!addWorker(command, false)) reject(command); }
如上,配合注释更容易理解,总结一下,一共分成3步:
接下来我们再来看一下如何添加Worker,这部分逻辑是在addWorker()方法中,这部分主要负责创建新的线程并执行任务:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 如果当前线程数量太多则直接退出 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 做自旋,如果当前线程数量更新成功则跳出retry执行后面addworker逻辑 if (compareAndIncrementWorkerCount(c)) break retry; // 重新读取ctl,如果线程池状态改变,则从retry重新执行 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 获取线程池主锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // 添加线程到workers中(线程池中) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } if (workerAdded) { // 启动新建的线程,此时添加的worker会被驱动执行其run()方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
主要分为如下5步:
在addWorker中添加了新的worker之后会启动其封装的线程,该worker也会随之被线程驱动执行(因为worker继承自Runnable)。前面讲Worker的时候我们知道其run()方法中只调用了一个方法,就是定义在ThreadPoolExecutor中的runWorker(),这里才是执行worker的主要工作逻辑:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null;
// 因为Worker自身就是一把简单的不可重入互斥锁(听起来好像也不简单。。),这里调用unlock()是为了将state的状态从-1改为0 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 如果是第一次执行任务,或者从队列中能够获取到任务,则执行 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 根据线程池的状态来判断是否需要将当前线程interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 执行任务开始前钩子函数 beforeExecute(wt, task); Throwable thrown = null; try { // 真正开始执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行任务后钩子函数 afterExecute(task, thrown); } } finally { task = null; // task置空,以便while循环中获取新任务 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
这是一个被final修饰的方法,不能被重写。总结一下其逻辑(比较复杂):
好了, 看了不少源码,我们稍微停一下:
上面的runWorker()方法中我们也看到了,worker跑起来之后取就进入了一个while循环中,不断地取任务并执行,好像没有看到哪里可以退出,那线程池又是如何让worker停下来的呢?我们接着往下看。
在上面那节的代码中我们可以看到Worker启动之后,一直在一个while()循环中工作,如果退出了这个循环,run()方法也就邻近结束了。所以只要能够让运行中的worker退出自己的while()循环就能结束worker了,那我们就要来看一下while循环中的条件:
while (task != null || (task = getTask()) != null) { 。。。 }
有两个条件:
这就是线程池关闭线程的开关入口,我们来看一下这个getTast()方法吧:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 只在必要的时候才检查任务队列是否为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
如果同时满足如下条件,则利用CAS机制尝试减少运行线程数,成功则返回空,失败则跳到第1步:
要么是运行线程数量大于最大线程数量maximumPoolSize,要么是存在非核心线程或者允许核心线程超时销毁并且已超时;
运行线程数大于1或任务队列为空;
接着就要从任务队列取任务了:
如果允许超时则调用poll取任务,这个方法会使当前线程阻塞一段指定时间;
否则调用take()取任务,这个方法会使当前线程一直阻塞,直到获取到任务或者被当前线程被中断;
如果取出任务则返回,没有的话则将timedOut置为true,标记为已超时(代表核心线程等待时间过长,需要删除),重新进入到步骤1,继续循环执行;
这里的逻辑比较多,因为有涉及到是否允许核心线程超时,所以需要细细品味。当调用getTask()为拿到任务,就意味着当前线程该做的工作已经完成了,不用再循环取任务执行了,剩下就是执行processWorkerExit()结束工作了。
现在提交任务、执行任务、以及停止任务的入口,这些逻辑我们都看完了,我们来看一下如何停止线程池。主要有两个方法:shutdown、shutdownNow,从名字我们可以看出区别:
我们来看一下具体实现细节:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess();
// 修改线程池运行状态为SHUTDOWN advanceRunState(SHUTDOWN);
// 中断空闲线程 interruptIdleWorkers();
// 预留的钩子函数 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); }
// tryTerminate(); }
逻辑比较清晰:
先来看一下如何利用CAS修改线程池状态,如下代码是advanceRunState()的实现,可以看到在循环中不断调用原子类ctl的compareAndSet()方法来设置值,这就是利用CAS机制:
private void advanceRunState(int targetState) {
// 进入循环 for (;;) { int c = ctl.get();
// 如果状态修改成功则退出循环 if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }
接下来看一下如何中断空闲线程,也很简单,就是对所有worker进行遍历,判断其是否被中断,如果没有则尝试设置其中断标志。这里只是说了一下基本流程,有些细节没有提到,需要代码中体会:
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) {
// 获取线程池的锁,并上锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {
// 遍历workers for (Worker w : workers) { Thread t = w.thread;
// 判断worker封装的Thread实例是否被中断,如果没有则尝试获取worker自己的锁 if (!t.isInterrupted() && w.tryLock()) { try {
// 设置中断状态 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } }
// 根据传入的参数,只中断一个worker if (onlyOne) break; } } finally { mainLock.unlock(); } }
最后我们再来看一下tryTerminate()的逻辑,配合代码看效果会更好,这里简单说一下,首先会有几轮判断,是否需要执行terminate(),接着会利用CAS机制尝试修改线程池状态为TIDYING,成功则执行terminate(),失败则循环执行:
final void tryTerminate() { for (;;) { int c = ctl.get();
/**
* 满足如下两个条件则直接返回
* 1. 线程池当前状态为RUNNING、TIDYING、TERMINATED
* 2. 线程池当前状态为SHUTDOWN且任务队列不为空,那还要继续将队列中的任务执行完才能结束
**/ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return;
// 到这里代表线程池的状态为SHUTDOWN或STOP,如果还有存活线程,则尝试中断一个并返回 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {
// 尝试将线程池状态修改为TIDYING,修改成功则执行terminated(),如果没有则继续循环执行 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally {
// 执行完terminated()之后需要确保将线程池状态修改为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
其实shutdown()最终还是通过设置工作线程的中断状态来实现结束中断线程的,关于这种方式我们前面也是专门写过一篇文章的:<<线程间通信>>。具体是如何结束的,在线程执行的过程中会不断的调用getTask()从任务队列获取任务,在getTask()中会对中断状态进行监控,一旦发现之后会根据具体逻辑执行对应操作,具体参考getTask()的代码。
看完shutdown()的我们再来看一下shutdownNow()的逻辑:
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
基本流程和shutdown()类似,advanceRunState()和tryTerminate()是一样的,我们就不再赘述了,重点来看一下interruptWorkers()的逻辑:
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
这样看很简单,就是遍历所有worker,调用其interruptIfStarted()方法,这个方法实现在Worker中,我们来看一下这个方法,也比较清晰,就是判断一下再决定是否设置线程中断标志位,可见,其和shutdown停止线程的方式是一样的,区别主要在于设置线程状态的不同以及将任务队列中的任务丢弃,即drainQueue()方法:
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); // 将阻塞队列中的任务全部移除并添加到taskList中 q.drainTo(taskList); // 再检查一次队列是否有任务 if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
其实呢,在啃线程池源码的过程中,还是要费一些心思的,尤其是要弄明白如何添加任务、如何添加Worker、Worker如果工作以及如何停止Worker的工作这一整套流程,中间确实逻辑比较复杂,但是呢在探索的过程中会不断有新的发现,越啃越细,越啃越清晰。我其实也不是一两天就看明白了,最早只是大概看了一遍,然后做了一些笔记,隔了几个月之后再来看,又有新的收获,所以就有了这篇文章。看到这里说明你也看懂了,恭喜你在学习的路上又有精进了 ^_^
原文:https://www.cnblogs.com/volcano-liu/p/10783012.html