日常的开发过程中,经常会有使用到多线程。从而使得程序处理某项任务的效率大大提高。使用多线程的时候,为了更好的把多线程管理、调度起来,从而最大的发挥系统的运算能力。这个时候我们往往选择线程池来对多线程进行管理和协调。
java中,关于线程池的类的接口图如下
1、最顶级的接口是Executor,不过Executor严格意义上来说并不是一个线程池而只是提供了一种任务如何运行的机制而已
2、ExecutorService才可以认为是真正的线程池接口,接口提供了管理线程池的方法
3、下面两个分支,AbstractExecutorService分支就是普通的线程池分支,ScheduledExecutorService是用来创建定时任务的
创建ThreadPoolExecutor时需要7个参数,如下
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
1、 corePoolSiz 核心线程数,表示线程池中的活跃线程个数,线程池初始化创建的时候,核心线程数的个数为0,只有提交任务到线程池中的时候,才会创建线程执行任务。
2、maximumPoolSize最大线程池数,当线程池中的活跃线程数已经大于等于核心线程数的数量时,继续提交任务则会放到工作队列里面,当工作队列满了以后,若继续提交任务到线程池中的时候,就会继续创建线程执行这些任务,直到数目达到最大线程池数目。
3、keepAliveTime 存活时间,若线程资源使用完毕后,再无提交任务需要执行,等待多久资源释放。
4、unit 存活时间的单位 可以为时分秒等
5、workQueue 工作队列,这个就是当核心线程数已经满了,继续提交任务就会放到工作队列里面。工作队列又分为有界和***。
6、threadFactory 线程工厂,创建线程执行提交的任务.
7、handler 处理器 ,当线程数大于线程池最大线程数的时候的拒绝策略。
1、当池中的线程数小于corePoolSize,提交任务直接创建线程执行。
2、当池中的线程数大于corePoolSize,但是工作队列未满,提交的任务放到工作队列。
3、当池中线程数大于corePoolSize,工作队列已满,但是线程数小于maximumPoolSize,提交任务继续创建线程执行。
4、当池中线程数大于corePoolSize,工作队列已满,且线程数大于maximumPoolSize,提交任务后执行拒绝策略。
所谓拒绝策略之前也提到过了,任务太多,超过maximumPoolSize了怎么把?当然是接不下了,接不下那只有拒绝了。拒绝的时候可以指定拒绝策略,也就是一段处理程序。
决绝策略的父接口是RejectedExecutionHandler,JDK本身在ThreadPoolExecutor里给用户提供了四种拒绝策略,看一下:
1、AbortPolicy 直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略
2、CallerRunsPolicy 尝试直接运行被拒绝的任务,如果线程池已经被关闭了,任务就被丢弃了
3、DiscardOldestPolicy 移除最晚的那个没有被处理的任务,然后执行被拒绝的任务。同样,如果线程池已经被关闭了,任务就被丢弃了
4、DiscardPolicy 不能执行的任务将被删除
1、newFixedThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到,核心线程数和线程池最大线程数一致且由创建者传入,存活时间为0,采用***队列。意思就是说,当池中的线程数大于corePoolSize,会一致往队列中放,多以最大线程数可以说没有意义。前面任务执行完成以后会从队列中弹出任务执行。
2、newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
可以看到,核心线程数和最大线程数都是1,同样的使用的***队列。保证所有提交到队列的任务按照顺序一个个的执行。
3、newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到,核心线程数为0,最大线程数为Integer.MAX_VALUE。线程存活的时间为60秒,工作队列用的是SynchronousQueue(SynchronousQueue是一个双栈双队列算法,无空间的队列或栈,任何一个对SynchronousQueue写需要等到一个对SynchronousQueue的读操作,反之亦然。一个读操作需要等待一个写操作,相当于是交换通道,提供者和消费者是需要组队完成工作,缺少一个将会阻塞线程,知道等到配对为止。)也就是说SynchronousQueue是一个没有容量,无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。 相当于每提交一个任务都会去创建线程执行,而且可以创建的最大数量为Integer.MAX_VALUE。对于执行完任务的线程,60秒后就会被回收。
4、newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
};
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
};
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
};
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
这个是定时执行线程池,和前几个线程池无异。可以看出,ScheduledThreadPoolExecutor的核心线程池的线程个数为指定的corePoolSize,当核心线程池的线程个数达到corePoolSize后,就会将任务提交给有界阻塞队列DelayedWorkQueue,对DelayedWorkQueue在下面进行详细介绍,线程池允许最大的线程个数为Integer.MAX_VALUE,也就是说理论上这是一个大小***的线程池。
ScheduledThreadPoolExecutor实现了ScheduledExecutorService接口,该接口定义了可延时执行异步任务和可周期执行异步任务的特有功能,相应的方法分别为:
//达到给定的延时时间后,执行任务。这里传入的是实现Runnable接口的任务,
//因此通过ScheduledFuture.get()获取结果为null
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//达到给定的延时时间后,执行任务。这里传入的是实现Callable接口的任务,
//因此,返回的是任务的最终计算结果
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
//是以上一个任务开始的时间计时,period时间过去后,
//检测上一个任务是否执行完毕,如果上一个任务执行完毕,
//则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//当达到延时时间initialDelay后,任务开始执行。上一个任务执行结束后到下一次
//任务执行,中间延时时间间隔为delay。以这种方式,周期性执行任务。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
ScheduledThreadPoolExecutor最大的特色是能够周期性执行异步任务,当调用schedule,scheduleAtFixedRate和scheduleWithFixedDelay方法时,实际上是将提交的任务转换成的ScheduledFutureTask类,从源码就可以看出。以schedule方法为例:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
可以看出,通过decorateTask会将传入的Runnable转换成ScheduledFutureTask类。线程池最大作用是将任务和线程进行解耦,线程主要是任务的执行者,而任务也就是现在所说的ScheduledFutureTask。紧接着,会想到任何线程执行任务,总会调用run()方法。为了保证ScheduledThreadPoolExecutor能够延时执行任务以及能够周期性执行任务,ScheduledFutureTask重写了run方法:
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//如果不是周期性执行任务,则直接调用run方法
ScheduledFutureTask.super.run();
//如果是周期性执行任务的话,需要重设下一次执行任务的时间
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
从源码可以很明显的看出,在重写的run方法中会先if (!periodic)判断当前任务是否是周期性任务,如果不是的话就直接调用run()方法;否则的话执行setNextRunTime()方法重设下一次任务执行的时间,并通过reExecutePeriodic(outerTask)方法将下一次待执行的任务放置到DelayedWorkQueue中。
因此,可以得出结论:ScheduledFutureTask最主要的功能是根据当前任务是否具有周期性,对异步任务进行进一步封装。如果不是周期性任务(调用schedule方法)则直接通过run()执行,若是周期性任务,则需要在每一次执行完后,重设下一次执行的时间,然后将下一次任务继续放入到阻塞队列中。
参考:https://www.jianshu.com/p/502f9952c09b
原文:https://blog.51cto.com/12122148/2443424