在阅读研究线程池的源码之前,一直感觉线程池是一个框架中最高深的技术。研究后才发现,线程池的实现是如此精巧。本文从技术角度分析了线程池的本质原理和组成,同时分析了JDK、Jetty6、Jetty8、Tomcat的源码实现,对于想了解线程池本质、更好的使用线程池或者定制实现自己的线程池的业务场景具有一定指导意义。
从内部实现上看,线程池技术可主要划分为如下6个要点实现:
图1线程池技术要点
线程池终止逻辑:应用停止时线程池要有自身的停止逻辑,保证所有job都得到执行或者抛弃。
结合上面的技术点,列举几种线程池实现方式。
工作者workers与待处理工作队列实现方式举例:
实现 |
工作者workers结构与并发保护 |
待处理工作队列结构 |
JDK |
使用了HashSet来存储工作者workers,通过可重入锁ReentrantLock对其进行并发保护。每个worker都是一个Runnable接口。 |
使用了实现接口BlockingQueue的阻塞队列来存储待处理工作job,并把队列作为构造函数参数,从而实现业务可以灵活的扩展定制线程池的队列。业务也可使用JDK自身的同步阻塞队列SynchronousQueue、有界队列ArrayBlockingQueue、无界队列LinkedBlockingQueue、优先级队列PriorityBlockingQueue。 |
Jetty6 |
同样使用了HashSet存储工作者workers,通过synchronized一个对象进行HashSet的并发保护。每个工作者实际上是一个Thread的扩展。 |
使用了数组存储待处理的job对象Runnable。数组初始化容量为_maxThreads个,使用变量_queued计算保存当前内部待处理job的个数即数组length。超过数组最大值时,扩大_maxThreads个容量,因此数组永远够用够大,容量无界。同样是用synchronized一个对象的方式实现同步。 |
Jetty8 |
使用了ConcurrentLinkedQueue存储工作者workers,利用JDK基于CSA算法的实现提高了并发效率,同时也降低了线程池并发保护的复杂程度。针对队列ConcurrentLinkedQueue无法保证size()实时性问题引入原子变量AtomicInteger统计工作者数量。 |
与JDK相同实现,使用了基于接口BlockingQueue的阻塞队列来存储待处理工作job,也支持在线程池构造函数的参数中传入队列类型。同时,Jetty8内部默认未设置队列类型场景可自动设置使用2种队列:有界无法扩容的ArrayBlockingQueue及Jetty自身定制扩展实现的可扩容队列BlockingArrayQueue。 |
Tomcat |
基于JDK的ThreadPoolExecutors实现,复用JDK业务 |
复用JDK业务 |
线程池初始化与处理业务job算法举例:
实现 |
线程池构造与工作者初始化 |
处理业务job的算法 |
JDK |
1. 基于多个构造参数实现灵活初始化,几个核心参数如下: corePoolSize:核心工作者数 maximumPoolSize:最大工作者数 keepAliveTime:超过核心工作者数时闲置工作者的存活时间。 workQueue:待处理job队列,即前面提到的BlockingQueue接口。 2. 默认初始化后不启动工作者,等待有请求时才启动。可以通过调用线程池接口提前启动核心工作数个工作者线程,也可以启动业务期望的多个工作者线程。 |
1. 工作者workers数量低于核心工作者数corePoolSize时会优先创建一个工作者worker处理job,处理成功则返回。 2. 工作者workers数量高于核心工作者数时会优先把job放入到待处理队列,放入队列成功时处理结束。 3. 步骤2中入队失败会识别工作者数是否还小于最大工作者数maximumPoolsize,小于的话也会新创建一个工作者worker处理job。 4. 拒绝处理 |
Jetty6 |
1. 同样支持设置多个参数: _spawnOrShrinkAt:扩容/缩容阀值 _minThreads:最小工作者数 _maxThreads:最大工作者数 _maxIdleTimeMs:闲置工作者最大闲置超时时间 2. 初始化后直接启动_minThreads个工作者线程 |
1. 查找闲置的工作者worker,找到则派发job。 2. 没有闲置的工作者,将job存入待处理数组。 3. 当识别到数组中待处理job超过扩容阀值参数时,扩容增加工作者处理job 4. 否则不处理 |
Jetty8 |
1. 配置参数类似Jetty6,去除了_spawnOrShrinkAt阀值参数。 2. 初始化后直接启动_minThreads个工作者线程 |
非常简单,直接将待处理job入队。 |
Tomcat |
1. 基于JDK线程池的构造方法 2. 来请求时启动工作者 |
处理方法复用JDK的,但是在开始提交前扩展了JDK的功能,实现了可以统计提交数submittedCount的能力 |
线程池工作者worker的增减机制举例:
实现 |
工作者增加算法 |
工作者减少算法 |
JDK |
1. 待处理job来时,工作者workers数量低于核心工作者数corePoolSize时。 2. 待处理job来时,workers数超过核心数小于最大工作者数且入待处理队列失败场景。 3. 业务调用线程池的更新核心工作者数接口时,若发现扩容,会增加工作者数。 |
1. 待处理任务队列里没有job并且工作者workers数量超过了核心工作者数corePoolSize。 2. 待处理任务队列里没有job并且允许工作者数量小于核心工作者参数为true,此场景会至少保留一个工作者线程。 |
Jetty6 |
1. 启动线程池时会启动_minThreads个工作者线程 2. 待处理的job数量高于了阀值参数且工作者数没有达到最大值时会增加工作者。 3. 调用线程池接口setMinThreads更新最小工作者数时会根据需要增加工作者。 |
如下三个条件同时满足时会减少工作者: 1. 待处理任务数组中没有待处理job 2. 工作者workers数量超过了最小工作者数_minThreads 3. 闲置工作者线程数高于了阀值参数 |
Jetty8 |
1. 启动线程池时启动最小工作者参数个工作者线程 2. 已经没有闲置工作者或者闲置工作者的数量已经小于待处理的job的总数 3. 调用线程池接口setMinThreads更新最小工作者数时 |
如下三个条件同时满足时会减少工作者: 1. 待处理任务队列里没有待处理的job 2. 工作者workers总数超过了最小工作者参数配置_minThreads 3. 工作者线程的闲置时间超时 |
Tomcat |
同JDK增加工作者算法 |
复用JDK减少算法,同时定制扩展延迟参数,超过参数时,直接抛出异常到外面来终止线程池工作者。 |
对比几种线程池实现,JDK的实现是最为灵活、功能最强且扩展性最好的,Tomcat即基于JDK线程池功能扩展实现,复用原有业务的同时扩充了自己的业务。Jetty6是完全自己定制的线程池业务,耦合线程池众多复杂的业务逻辑到线程池类里面,逻辑相对最为复杂,扩展性也非常差。Jetty8相对Jetty6的实现简化了很多,其中利用了JDK中的同步容器和原子变量,同时实现方式也越来越接近JDK。
感谢郭蕾对本文的审校。
原文地址:http://www.infoq.com/cn/articles/thread-pool-algorithm-realization
原文:http://www.cnblogs.com/Mainz/p/3870053.html