应用线程池代码
public class ThreadPoolDemo implements Runnable{
static ExecutorService executorService = Executors.newFixedThreadPool(3);
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("name:" + Thread.currentThread().getName());
}
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
ThreadPoolDemo threadPoolDemo = new ThreadPoolDemo();
executorService.execute(threadPoolDemo);
}
executorService.shutdown();
}
}
线程池创建大致分为5种:
static ExecutorService executorService = Executors.newFixedThreadPool(3);
static ExecutorService executorService1 = Executors.newSingleThreadExecutor();
static ExecutorService executorService2 = Executors.newCachedThreadPool();
static ExecutorService executorService3 = Executors.newScheduledThreadPool(...);
static ExecutorService executorService = Executors.newWorkStealingPool();
但是阿里的开发手册上写的是不建议用这种方式去创建线程池,而是应该重写它的实现方法去创建,因为这样你就会详细的知道各个参数的意义
下面我们来分析下线程池的实现原理,它是怎么管理线程? 先看execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看到有3个比较重要的代码,也可以说是过程:
1、int c = ctl.get(); --保存线程运行章台和线程数量
2、addWorker(command, true) --线程池的核心,主要的worker
3、reject(command); --线程池关闭或者已满,拒绝测率
ctl 的作用
在线程池中,ctl贯穿在线程池的整个生命周期中
ctl:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 它是一个原子类,主要作用是用来保存线程数量和线程池的状态。我们来分析一下这段代码, 其实比较有意思,他用到了位运算 一个int数值是32个bit位,这里采用高3位来保存运行状态,低29位来保存线程数量。
我们来分析默认情况下,也就是ctlOf(RUNNING)运行状态,调用了ctlOf(int rs,int wc)方法; 其中
private static int ctlOf(int rs, int wc) { return rs | wc; } 其中RUNNING =-1 << COUNT_BITS ;
-1左移29位. -1的二进制是32个1(1111 1111 1111 1111 1111 1111 1111 1111) - 1 的二进制计算方法
原码是 1000…001 . 高位 1 表示符号位。 然后对原码取反,高位不变得到 1111…110 然后对反码进行 + 1 ,也就是补码操作, 最后得到 1 111…1111 那么-1 <<左移 29位, 也就是 【111】 表示; rs | wc 。
二进制的111 | 000 。得到的结 果仍然是111
这个方法相当于是去创建一定数量的worker,然后去循环处理任务 这个方法里面有两个自旋 其实这个方法里面就做了两件事 1、采用循环CAS操作来将线程数+1
if (compareAndIncrementWorkerCount(c))
break retry;
2、新建一个线程并启用
核心代码
workers.add(w);
。。。
t.start();
workerStarted = true;
w = new Worker(firstTask); //创建worker
然后Worker类中,会重写run方法:
public void run() {
runWorker(this);
}
拒绝策略
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务; 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录 日志或持久化存储不能处理的任务
前面已经了解了 ThreadPoolExecutor 的核心方法 addWorker,主要作用是增加工作线程, 而 Worker 简单理解其实就是一个线程,里面重新了 run 方法,这块是线程池中执行任务的 真正处理逻辑,也就是runWorker方法,这个方法主要做几件事 1. 如果task不为空,则开始执行task 2. 如果task为空,则通过getTask()再去取任务,并赋值给task,如果取到的Runnable不为空,则 执行该任务 3. 执行完毕后,通过while循环继续getTask()取任务 4. 如果getTask()取到的任务依然是空,那么整个runWorker()方法执行完毕
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
unlock,表示当前 worker 线程允许中断,因为 new Worker 默认的 state=-1,此处是调用
Worker 类的 tryRelease()方法,将 state 置为 0,
而 interruptIfStarted()中只有 state>=0 才允许调用中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//注意这个 while 循环,在这里实现了 [线程复用] // 如果 task 为空,则通过
getTask 来获取任务
while (task != null || (task = getTask()) != null) {
w.lock(); //上锁,不是为了防止并发执行任务,为了在 shutdown()时不终止正
在运行的 worker
线程池为 stop
状态时不接受新任务,不执行已经加入任务队列的任务,还中断正在执
行的任务
// 所以对于 stop
状态以上是要中断线程的
//(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP) 确保线
程中断标志位为 true
且是 stop
状态以上,接着清除了中断标志
//!wt.isInterrupted() 则再一次检查保证线程需要设置中断标志位
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//这里默认是没有实现的,在一些特定的场景中
我们可以自己继承 ThreadpoolExecutor 自己重写
Throwable thrown = null;
try {
task.run(); //执行任务中的 run 方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //这里默认默认而也是没有实现
}
} finally {
//置空任务(这样下次循环开始时,task 依然为 null,需要再通过 getTask()
取) + 记录该 Worker 完成任务数量 + 解锁
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
//1.将入参 worker 从数组 workers 里删除掉;
//2.根据布尔值 allowCoreThreadTimeOut 来决定是否补充新的 Worker 进数组
workers
}
}
ThreadPoolExecutor 提供了两个方法,用于线程池的关闭,分别是 shutdown()和 shutdownNow(),其中:shutdown():不会立即终止线程池,而是要等所有任务缓存队列中 的任务都执行完后才终止,但再也不会接受新的任务shutdownNow():立即终止线程池,并 尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
原文:https://www.cnblogs.com/deyujincheng/p/11442236.html