线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
如果并发请求数量很多,但每个线程执行的时间很短,就会出现频繁的创建和销毁线程。如此一来,会大大降低系统的效率,可能频繁创建和销毁线程的时间、资源开销要大于实际工作的所需。
正是由于这个问题,所以有必要引入线程池。使用 线程池的好处 有以下几点:
Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。
Executor 框架核心 API 如下:
Executor - 运行任务的简单接口。ExecutorService - 扩展了 Executor 接口。扩展能力:
ScheduledExecutorService - 扩展了 ExecutorService 接口。扩展能力:支持定期执行任务。AbstractExecutorService - ExecutorService 接口的默认实现。ThreadPoolExecutor - Executor 框架最核心的类,它继承了 AbstractExecutorService 类。ScheduledThreadPoolExecutor - ScheduledExecutorService 接口的实现,一个可定时调度任务的线程池。Executors - 可以通过调用 Executors 的静态工厂方法来创建线程池并返回一个 ExecutorService对象。
Executor 接口中只定义了一个 execute 方法,用于接收一个 Runnable 对象。
public interface Executor { void execute(Runnable command); }
ExecutorService 接口继承了 Executor 接口,它还提供了 invokeAll、invokeAny、shutdown、submit 等方法。
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
从其支持的方法定义,不难看出:相比于 Executor 接口,ExecutorService 接口主要的扩展是:
sumbit、invokeAll、invokeAny 方法中都支持传入Callable 对象。shutdown、shutdownNow、isShutdown 等方法。ScheduledExecutorService 接口扩展了 ExecutorService 接口。
它除了支持前面两个接口的所有能力以外,还支持定时调度线程。
public interface ScheduledExecutorService extends ExecutorService { public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
其扩展的接口提供以下能力:
schedule 方法可以在指定的延时后执行一个 Runnable 或者 Callable 任务。scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法可以按照指定时间间隔,定期执行任务。java.uitl.concurrent.ThreadPoolExecutor 类是 Executor 框架中最核心的类。所以,本文将着重讲述一下这个类。
ThreadPoolExecutor 有以下重要字段:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
参数说明:
ctl - 用于控制线程池的运行状态和线程池中的有效线程数量。它包含两部分的信息:
runState)workerCount)ctl 使用了 Integer 类型来保存,高 3 位保存 runState,低 29 位保存 workerCount。COUNT_BITS 就是 29,CAPACITY 就是 1 左移 29 位减 1(29 个 1),这个常量表示 workerCount 的上限值,大约是 5 亿。RUNNING - 运行状态。接受新任务,并且也能处理阻塞队列中的任务。SHUTDOWN - 关闭状态。不接受新任务,但可以处理阻塞队列中的任务。
RUNNING 状态时,调用 shutdown 方法会使线程池进入到该状态。finalize 方法在执行过程中也会调用 shutdown 方法进入该状态。STOP - 停止状态。不接受新任务,也不处理队列中的任务。会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow 方法会使线程池进入到该状态。TIDYING - 整理状态。如果所有的任务都已终止了,workerCount (有效线程数) 为 0,线程池进入该状态后会调用 terminated 方法进入 TERMINATED 状态。TERMINATED - 已终止状态。在 terminated 方法执行完后进入该状态。默认 terminated 方法中什么也没有做。进入 TERMINATED 的条件如下:
RUNNING 状态;TIDYING 状态或 TERMINATED 状态;SHUTDOWN 并且 workerQueue 为空;workerCount 为 0;TIDYING 状态成功。
ThreadPoolExecutor 有四个构造方法,前三个都是基于第四个实现。第四个构造方法定义如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
参数说明:
corePoolSize - 核心线程数量。当有新任务通过 execute 方法提交时 ,线程池会执行以下判断:
corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的。corePoolSize 且小于 maximumPoolSize,则只有当 workQueue 满时才创建新的线程去处理任务;corePoolSize 和 maximumPoolSize 相同,则创建的线程池的大小是固定的。这时如果有新任务提交,若 workQueue 未满,则将请求放入 workQueue 中,等待有空闲的线程去从 workQueue 中取任务并处理;maximumPoolSize,这时如果 workQueue 已经满了,则使用 handler 所指定的策略来处理任务;corePoolSize => workQueue => maximumPoolSize。maximumPoolSize - 最大线程数量。
keepAliveTime:线程保持活动的时间。
corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime。unit - keepAliveTime 的时间单位。有 7 种取值。可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。workQueue - 等待执行的任务队列。用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
ArrayBlockingQueue - 有界阻塞队列。
LinkedBlockingQueue - 无界阻塞队列。
Integer.MAX_VALUE。ArrayBlockingQueue。LinkedBlockingQueue 意味着: maximumPoolSize 将不起作用,线程池能创建的最大线程数为 corePoolSize,因为任务等待队列是无界队列。Executors.newFixedThreadPool 使用了这个队列。SynchronousQueue - 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
LinkedBlockingQueue。Executors.newCachedThreadPool 使用了这个队列。PriorityBlockingQueue - 具有优先级的无界阻塞队列。threadFactory - 线程工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。handler - 饱和策略。它是 RejectedExecutionHandler 类型的变量。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。线程池支持以下策略:
AbortPolicy - 丢弃任务并抛出异常。这也是默认策略。DiscardPolicy - 丢弃任务,但不抛出异常。DiscardOldestPolicy - 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。CallerRunsPolicy - 直接调用 run 方法并且阻塞执行。RejectedExecutionHandler 接口来定制处理策略。如记录日志或持久化不能处理的任务。默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
提交任务可以使用 execute 方法,它是 ThreadPoolExecutor 的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
execute 方法工作流程如下:
workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
在 ThreadPoolExecutor 类中还有一些重要的方法:
submit - 类似于 execute,但是针对的是有返回值的线程。submit 方法是在 ExecutorService 中声明的方法,在 AbstractExecutorService 就已经有了具体的实现。ThreadPoolExecutor 直接复用 AbstractExecutorService 的 submit 方法。shutdown - 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。
SHUTDOWN 状态;interruptIdleWorkers 方法请求中断所有空闲的 worker;tryTerminate 尝试结束线程池。shutdownNow - 立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。与 shutdown 方法类似,不同的地方在于:
STOP;isShutdown - 调用了 shutdown 或 shutdownNow 方法后,isShutdown 方法就会返回 true。isTerminaed - 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。setCorePoolSize - 设置核心线程数大小。setMaximumPoolSize - 设置最大线程数大小。getTaskCount - 线程池已经执行的和未执行的任务总数;getCompletedTaskCount - 线程池已完成的任务数量,该值小于等于 taskCount;getLargestPoolSize - 线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize;getPoolSize - 线程池当前的线程数量;getActiveCount - 当前线程池中正在执行任务的线程数量。JDK 的 Executors 类中提供了几种具有代表性的线程池,这些线程池 都是基于 ThreadPoolExecutor 的定制化实现。
在实际使用线程池的场景中,我们往往不是直接使用 ThreadPoolExecutor ,而是使用 JDK 中提供的具有代表性的线程池实例。
创建一个单线程的线程池。
只会创建唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它 。
单工作线程最大的特点是:可保证顺序地执行各个任务。
创建一个固定大小的线程池。
每次提交一个任务就会新创建一个工作线程,如果工作线程数量达到线程池最大线程数,则将提交的任务存入到阻塞队列中。
FixedThreadPool 是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
创建一个可缓存的线程池。
CachedThreadPool 时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
一般多线程执行的任务类型可以分为 CPU 密集型和 I/O 密集型,根据不同的任务类型,我们计算线程数的方法也不一样。
CPU 密集型任务:这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
// 成员变量 private volatile ExecutorService executor; //单例模式创建 private ExecutorService getExecutorService(){ if(executor == null){ synchronized (this){ if(executor == null){ int availableProcessors = Runtime.getRuntime().availableProcessors(); int maximumPoolSize = availableProcessors * 4; int queueCapacity = availableProcessors * 100; long keepAliveTime = 60L; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(queueCapacity); executor = new ThreadPoolExecutor( availableProcessors, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardPolicy()); // 关闭客户端(关闭后台线程) Runtime.getRuntime().addShutdownHook(new Thread(() -> { if(Objects.nonNull(executor)){ executor.shutdown(); } })); } } } return executor; }
原文:https://www.cnblogs.com/turbosha/p/13302946.html