线程池的概念是初始化线程池时在池中创建空闲的线程,一旦有工作任务,可直接使用线程池中的线程进行执行工作任务,任务执行完成后又返回线程池中成为空闲线程。使用线程池可以减少线程的创建和销毁,提高性能。
????举个例子:我是一个包工头,代表线程池,手底下有若干工人代表线程池中的线程。如果我没接到项目,那么工人就相当于线程池中的空闲线程,一但我接到了项目,我可以立刻让我手下的工人去工作,每个工人同一时间执行只执行一个工作任务,执行完了就去执行另一个工作任务,直到没有工作任务了,这时工人就可以休息了。
队列作为一个缓冲的工具,当没有足够的线程去处理任务时,可以将任务放进队列中,以队列先进先出的特性来执行工作任务
举个例子,我又是一个包工头,一开始我只接了一个小项目,所以只有三个工作任务,但我手底下有四个工人,那么其中三人各领一个工作任务去执行就好了,剩下一个人就先休息。但突然我又接到了几个大项目,那么有现在有很多工作任务了,但手底下的工人不够啊。
那么我有两个选择:
(1)雇佣更多的工人
(2)把工作任务记录下来,按先来后到的顺序执行
但雇佣更多等工人需要成本啊,对应到计算机就是资源的不足,所以我只能把工作任务先记录下来,这样就成了一个队列了。
假设我又是一个包工头,我现在手底下没有工人了,但我接到了一个项目,有了工作任务要执行,那我肯定要去找工人了,但招人成本是很高的,工作完成后还要给遣散费,这样算起来好像不值,所以我事先雇佣了固定的几个工人作为我的长期员工,有工作任务就干活,没有就休息,如果工作任务实在太多,那我也可以再临时雇佣几个工人。一来二去工作效率高了,付出的成本也低了。Java自带的线程池的原理也是如此。
Executor接口是Executor的父接口,基于生产者--消费者模式,提交任务的操作相当于生产者,执行任务的线程则相当于消费者,如果要在程序中实现一个生产者--消费者的设计,那么最简单的方式通常是使用Executor。
Executor框架是java中的线程实现。Executor是最顶层的接口定义,它的子类和实现主要包括ExecutorService、ScheduledExecutorService、ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool等。其结构如下图所示:
Executor:Executor是一个接口,其只定义了一个execute()方法:void execute(Runnable command);,只能提交Runnable形式的任务,不支持提交Callable带有返回值的任务。
ExecutorService:ExecutorService在Executor的基础上加入了线程池的生命周期管理,我们可以通过ExecutorService#shutdown或者ExecutorService#shutdownNow方法来关闭我们的线程池。ExecutorService支持提交Callable形式的任务,提交完Callable任务后我们拿到一个Future,它代表一个异步任务执行的结果。
ThreadPoolExecutor:是线程池中最核心的类,这个类的各个构造参数
线程池的工作流程图
1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
线程池内部状态
其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
1、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
2、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
3、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
4、TIDYING : 2 << COUNT_BITS,即高3位为010;
5、TERMINATED: 3 << COUNT_BITS,即高3位为011;
任务提交
线程池框架提供了两种方式提交任务,根据不同的业务需求选择不同的方式。
Executor.excute()
通过Executor.execute()方法提交的任务,必须实现Runnable接口,该方式提交的任务不能获取返回值,因此无法判断任务是否执行成功。
ExecutorService.submit()
通过ExecutorService.submit()方法提交的任务,可以获取任务执行完的返回值。
任务执行
当向线程池中提交一个任务,线程池会如何处理该任务?
execute实现
具体的执行流程如下:
1.workerCountOf方法根据ctl的低29位,得到线程池的当前线程数,如果线程数小于corePoolSize,则执行addWorker方法创建新的线程执行任务;否则执行步骤(2);
2.如果线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中,则执行步骤(3),否则执行步骤(4);
3.再次检查线程池的状态,如果线程池没有RUNNING,且成功从阻塞队列中删除任务,则执行reject方法处理任务;
4.执行addWorker方法创建新的线程执行任务,如果addWoker执行失败,则执行reject方法处理任务;
addWorker实现
从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务,代码实现如下:
这只是addWorker方法实现的前半部分:
1、判断线程池的状态,如果线程池的状态值大于或等SHUTDOWN,则不处理提交的任务,直接返回;
2、通过参数core判断当前需要创建的线程是否为核心线程,如果core为true,且当前线程数小于corePoolSize,则跳出循环,开始创建新的线程,具体实现如下:
线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程,其中Worker类设计如下:
1、继承了AQS类,可以方便的实现工作线程的中止操作;
2、实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
3、当前提交的任务firstTask作为参数传入Worker的构造方法;
从Woker类的构造方法实现可以发现:线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。
runWorker实现
runWorker方法是线程池的核心:
1、线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行中断;
2、获取第一个任务firstTask,执行任务的run方法,不过在执行任务之前,会进行加锁操作,任务执行完会释放锁;
3、在执行任务的前后,可以根据业务场景自定义beforeExecute和afterExecute方法;
4、firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
getTask实现
整个getTask操作在自旋下完成:
1、workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
2、workQueue.poll:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;
所以,线程池中实现的线程可以一直执行由用户提交的任务。
根据Executor框架实现线程池的原理,我们可以自己动手实现
实现的思路如下:
?
?
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
?
?
public class ThreadPoolExecutor {
?
/*
* BlockingQueue是阻塞队列,在两种情况下出现阻塞:
* 1、当队列满了,入队列操作时;
* 2、当队列空了,出队列操作时。
* 阻塞队列是线程安全的,主要使用在生产/消费者的场景
*/
private BlockingQueue<Task> blockingQueue;
?
//线程池的工作线程数(可以认为是线程池的容量)
private int poolSize = 0;
?
//线程池的核心容量(也就是当前线程池中真正存在的线程个数)
private int coreSize = 0;
?
/*
* 此地方使用volatile关键字,volatile的工作原理是:对于JVM维度来说,每个线程持有变量的工作副本,那对于计算机维度来说,
* 就是这些变量的中间值会存放在高速缓存中。通过volatile关键字,告知每个线程改变此变量之后,立马更新到内存中去,并且使得
* 缓存中的数据失效,这样来保证其中某个线程改变公有变量后,其他线程能及时读取到最新的变量值,从而保证可见性。
* 原因如下:
* 1、在ThreadPoolExecutorTest中操作shutDown,这是main线程操作此变量(由于变量是volatile声明,所以会立马写入内存中);
* 2、Worker中线程通过while(!shutDown)来判断当前线程是否应该关闭,因此需通过volatile保证可见性,使线程可以及时得到关闭。
*/
private volatile boolean shutDown = false;
?
public ThreadPoolExecutor(int size) {
this.poolSize = size;
//LinkedBlockingQueue的大小可以指定,不指定即为无边界的。
blockingQueue = new LinkedBlockingQueue<>(poolSize);
}
?
public void execute(Task task) throws InterruptedException {
if(shutDown == true) {
return;
}
?
if(coreSize < poolSize) {
/*
* BlockingQueue中的插入主要有offer(obj)以及put(obj)两个方法,其中put(obj)是阻塞方法,如果插入不能马上进行,
* 则操作阻塞;offer(obj)则是插入不能马上进行,返回true或false。
* 本例中的Task不允许丢失,所以采用put(obj);
*/
blockingQueue.put(task);
produceWorker(task);
}else {
blockingQueue.put(task);
}
}
?
private void produceWorker(Task task) throws InterruptedException {
if(task == null) {
throw new NullPointerException("非法参数:传入的task对象为空!");
}
?
Thread thread = new Thread(new Worker());
thread.start();
coreSize++;
}
?
/*
* 真正中断线程的方法,是使用共享变量发出信号,告诉线程停止运行。
*
*/
public void shutDown() {
shutDown = true;
}
?
/*
* 此内部类是实际上的工作线程
*
*/
class Worker implements Runnable {
?
@Override
public void run() {
while(!shutDown) {
try {
//
blockingQueue.take().doJob();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("线程:" + Thread.currentThread().getName() + "退出运行!");
}
}
}
?
?
?
public class Task {
?
//通过taskId对任务进行标识
private int taskId;
?
public Task(int taskId) {
this.taskId = taskId;
}
?
public void doJob() {
System.out.println("线程" + Thread.currentThread().getName() + "正在处理任务!");
}
?
public int getId() {
return taskId;
}
}
?
?
?
public class ThreadPoolExecutorTest {
?
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3);
for(int i = 0; i < 10; i++) {
Task task = new Task(i);
threadPoolExecutor.execute(task);
}
?
threadPoolExecutor.shutDown();
}
}
?
运行结果
线程Thread-0正在处理任务!
线程Thread-1正在处理任务!
线程Thread-0正在处理任务!
线程Thread-1正在处理任务!
线程Thread-2正在处理任务!
线程Thread-0正在处理任务!
线程Thread-1正在处理任务!
线程:Thread-1退出运行!
线程:Thread-0退出运行!
线程Thread-2正在处理任务!
线程:Thread-2退出运行!
原文:https://www.cnblogs.com/kexinxin/p/11569981.html