首页 > 编程语言 > 详细

java并发:线程池之ThreadPoolExecutor

时间:2021-08-16 22:33:29      阅读:23      评论:0      收藏:0      [点我收藏+]

一、线程池的属性

技术分享图片

二、详解ThreadPoolExecutor

  上文提到可以通过显式的ThreadPoolExecutor构造函数来构造特定形式的线程池,ThreadPoolExecutor是java.util.concurrent包以内部线程池的形式对外提供线程池管理、线程调度等服务,此处我们来了解一下ThreadPoolExecutor

(1)一般使用方式

ExecutorService exec = new ThreadPoolExecutor(8,
                8, 
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(100),
                new ThreadPoolExecutor.CallerRunsPolicy());

 

(2)构造函数

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters, the default thread factory and the default rejected
     * execution handler.
     *
     * <p>It may be more convenient to use one of the {@link Executors}
     * factory methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and {@linkplain ThreadPoolExecutor.AbortPolicy
     * default rejected execution handler}.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and
     * {@linkplain Executors#defaultThreadFactory default thread factory}.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

 

(3)构造函数参数说明

技术分享图片

进一步解说:

A、当提交新任务时,若线程池大小小于corePoolSize,将创建一个新的线程来执行任务,即使此时线程池中存在空闲线程;

B、当提交新任务时,若线程池达到corePoolSize大小,新提交的任务将被放入workQueue中,等待线程池调度执行;

C、当提交新任务时,若workQueue已满,且maximumPoolSize>corePoolSize,将创建新的线程来执行任务;

D、当提交新任务时,若任务总数超过maximumPoolSize,新提交的任务将由RejectedExecutionHandler来处理;

E、当线程池中的线程数超过corePoolSize时,若线程的空闲时间达到keepAliveTime,则关闭空闲线程

(4)阻塞队列的选择

技术分享图片

 

 (5)简述SynchronousQueue

技术分享图片

Note:

同步队列 SynchronousQueue最多只有一个元素

三、定制ThreadPoolExecutor

(1)修改参数的方式

技术分享图片

技术分享图片

(2)自定义方式(封装各种参数)

示例(摘自网络):

import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.RejectedExecutionHandler;  
import java.util.concurrent.ThreadFactory;  
import java.util.concurrent.ThreadPoolExecutor;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.atomic.AtomicInteger;  
  
  
public class CustomThreadPoolExecutor {  
  
      
    private ThreadPoolExecutor pool = null;  
      
      
    /** 
     * 线程池初始化方法 
     *  
     * corePoolSize 核心线程池大小----10 
     * maximumPoolSize 最大线程池大小----30 
     * keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit 
     * TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES 
     * workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞队列 
     * threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂 
     * rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时, 
     *                          即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)), 
     *                          任务会交给RejectedExecutionHandler来处理 
     */  
    public void init() {  
        pool = new ThreadPoolExecutor(  
                10,  
                30,  
                30,  
                TimeUnit.MINUTES,  
                new ArrayBlockingQueue<Runnable>(10),  
                new CustomThreadFactory(),new CustomRejectedExecutionHandler());  
    }  
  
      
    public void destory() {  
        if(pool != null) {  
            pool.shutdownNow();  
        }  
    }  
      
      
    public ExecutorService getCustomThreadPoolExecutor() {  
        return this.pool;  
    }  
      
    private class CustomThreadFactory implements ThreadFactory {  
  
        private AtomicInteger count = new AtomicInteger(0);  
          
        @Override  
        public Thread newThread(Runnable r) {  
            Thread t = new Thread(r);  
            String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);  
            System.out.println(threadName);  
            t.setName(threadName);  
            return t;  
        }  
    }
      
    private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {  
  
        @Override  
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
            // 记录异常  
            // 报警处理等  
            System.out.println("error.............");  
        }  
    }
      
    // 测试构造的线程池  
    public static void main(String[] args) {  
        CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();  
        // 1.初始化  
        exec.init();  
          
        ExecutorService pool = exec.getCustomThreadPoolExecutor();  
        for(int i=1; i<100; i++) {  
            System.out.println("提交第" + i + "个任务!");  
            pool.execute(new Runnable() {  
                @Override  
                public void run() {  
                    try {  
                        Thread.sleep(300);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                    System.out.println("running=====");  
                }  
            });  
        }    
          
        // 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了  
        // exec.destory();  
          
        try {  
            Thread.sleep(10000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}

解读:

上述代码通过init()方法对ThreadPoolExecutor构造函数进行了一些自定义设置,getCustomThreadPoolExecutor()方法返回init()方法配置的ThreadPoolExecutor对象实例(即线程池引用)

 

四、扩展ThreadPoolExecutor

技术分享图片

 

五、源码视角

从源码视角分析Executors、ThreadPoolExecutor、ExecuteService、Executor之间的关系

技术分享图片

 

 

(1)Executors

  从Java5开始新增了Executors类,它有几个静态工厂方法用来创建线程池,这些静态工厂方法返回一个ExecutorService类型的值,此值即为线程池的引用。

技术分享图片

(2)Executor

Executor是一个接口,里面只有一个方法

public interface Executor {
    void execute(Runnable command);
}

 

(3)ExecuteService

ExecuteService也是一个接口,其定义如下:

public interface ExecutorService extends Executor {...}

 

技术分享图片

(4)ThreadPoolExecutor

继承AbstractExecutorService,AbstractExecutorService实现ExecutorService接口

public class ThreadPoolExecutor extends AbstractExecutorService {...}
public abstract class AbstractExecutorService implements ExecutorService {...}

 

六、ExecutorService的生命周期

在本文最开始的那个示例中,有一句代码,如下:

executor.shutdown();

 

该语句并不是终止线程的运行,而是禁止在这个executor中添加新的任务,下文描述了该语句对于ExecutorService的意义。

技术分享图片

 

七、饱和策略(线程池任务拒绝策略)

上文提到ThreadPoolExecutor构造函数的RejectedExecutionHandler handler参数,该参数表示当提交的任务数超过maxmumPoolSize与workQueue之和时,任务会交给RejectedExecutionHandler来处理,此处我们来具体了解一下

(1)四种饱和策略

技术分享图片

技术分享图片

技术分享图片

(2)源码分析:

RejectedExecutionHandler这个接口是用来处理被丢弃的线程的异常处理接口,其源码如下:

public interface RejectedExecutionHandler{
    //被线程池丢弃的线程处理机制
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) ;
}

 

AbortPolicy(中止策略)继承RejectedExecutionHandler接口,其源码如下:

public static class AbortPolicy implements RejectedExecutionHandler{

    public AbortPolicy(){}
    
    //直接抛出异常
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        throw new RejectedExecutionException("Task"+r.toString()+"rejected from"+executor.toString());
    }
    
}

 

我们可以自己实现RejectedExecutionHandler接口,将实现类作为线程丢弃处理类,代码如下:

package com.test;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectedExecutionHandlerDemo implements RejectedExecutionHandler{

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // TODO Auto-generated method stub
        System.out.println("线程信息"+r.toString()+"被遗弃的线程池:"+executor.toString());
    }
    
}

 

java并发:线程池之ThreadPoolExecutor

原文:https://www.cnblogs.com/studyLog-share/p/15149761.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!