注:本文的分析和源码基于jdk1.7;
一、ThreadPoolExecutor创建
ThreadPoolExecutor作为java.util.concurrent包中核心的类,先看下类型的结构:
最顶级的接口都是Executor,而ThreadPoolExecutor继承于抽象类AbstractExecutorService,提供一下4个构造函数用于创建:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,ong keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
前面的3个方法都是使用通过this调用最后一个方法,没有指定的构造参数使用默认参数,参数解析:
1、
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
线程池核心线程数大小,初始化是核心线程数也是0,除非先调用prestartCoreThread或者prestartAllCoreThreads先创建核心线程;
在没有设置allowCoreThreadTimeOut为true情况下,核心线程不会销毁;
2、
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;
线程池线程数最大值,达到最大值后线程池不会再增加线程执行任务,任务会进入等待队列或者由拒绝策略处理;
该值实际的可设置最大值不是Integer.MAX_VALUE,而是常量CAPACITY(后面再解析常量)
3、
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);1、判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,如果能完成新线程创建exexute方法结束,成功提交任务;
private boolean addWorker(Runnable firstTask, boolean core) {//<span style="font-family: 微软雅黑; widows: auto;">firstTask:新增一个线程并执行这个任务,可空,增加的线程从队列获取任务;core:是否使用corePoolSize作为上限,否则使用maxmunPoolSize</span> retry: //很少见的关键字,自行度娘 for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))//线程状态非运行并且当非shutdown状态下任务为空且队列非空; return false; //判断条件有点难理解,其实是非运行状态下(>=SHUTDOWN)或者SHUTDOWN状态下任务非空(新提交任务)、任务队列为空,就不可以再新增线程了(return false),即SHUTDOWN状态是可以新增线程去执行队列中的任务; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //实际最大线程数是CAPACITY; return false; if (compareAndIncrementWorkerCount(c)) //AtomicInteger的CAS操作; break retry; //<span style="font-family: 微软雅黑; widows: auto;">新增线程数成功,结束retry(retry下的for循环)</span> c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) //状态发生改变,重试retry; continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); // Worker为内部类,封装了线程和任务,通过ThreadFactory创建线程,可能失败抛异常或者返回null final Thread t = w.thread; if (t != null) { mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // SHUTDOWN以后的状态和SHUTDOWN状态下firstTask为null,不可新增线程 throw new IllegalThreadStateException(); workers.add(w); //保存到一个HashSet中 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //记录最大线程数 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//失败回退,从wokers移除w,线程数减一,尝试结束线程池(调用tryTerminate方法,后续解析) } return workerStarted; }
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //这个方法校验线程访问许可,不是很理解,后面有时间再单独解析; advanceRunState(SHUTDOWN); //转换线程池状态为SHUTDOWN interruptIdleWorkers(); //中断所有空闲的线程 onShutdown(); // 空实现方法,是做shutdown清理操作的 } finally { mainLock.unlock(); } tryTerminate(); //尝试结束线程池(设置状态为TERMINATED) }
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess();//同上 advanceRunState(STOP);//转换线程池状态到STOP interruptWorkers();//中断所有线程 tasks = drainQueue();//获取到任务队列所有任务,并清空队列 } finally { mainLock.unlock(); } tryTerminate();//同上 return tasks; }
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //保证只有SHUTDOWN状态下任务队列为空和STOP状态下才以尝试终止 return; if (workerCountOf(c) != 0) { //线程数还不是0情况下不可结束线程池 interruptIdleWorkers(ONLY_ONE); //只为了中断一个线程?还不是非常理解设计者的意思 return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //CAS操作设置TIDYING状态,注意这里处于循环中,失败会重设的 try { terminated(); //空实现方法 } finally { ctl.set(ctlOf(TERMINATED, 0));//最终状态TERNINATED termination.signalAll();//可重入锁的condition,通知所有wait,后面会有看到 } return; } } finally { mainLock.unlock(); } } }
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }这个方法两个入参,设置等待超时时间;
原文:http://blog.csdn.net/wenhuayuzhihui/article/details/51377174