首页 > 编程语言 > 详细

Executors 线程池

时间:2021-04-11 00:37:00      阅读:34      评论:0      收藏:0      [点我收藏+]

Executors 提供了五种常用的线程池

 

Executors的其中四种线程池其实都是调用ThreadPoolExecutor实现的:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

 

ThreadPoolExecutor核心参数介绍:

corePoolSize:核心线程数

1.核心线程会一直存活,及时没有任务需要执行
2.当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理

maximumPoolSize:最大线程数

1.当线程数 >= corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
2.当线程数 = maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常

keepAliveTime:线程空闲时间

1.超过corePoolSize后新创建的线程会根据keepAliveTime来控制该线程的空闲存活时间

2.如果调用了ThreadPoolExecutor.allowCoreThreadTimeOut(boolean)的方法,将allowCoreThreadTimeOut(允许核心线程超时)修改为true,则线程池中所有线程都会根据keepAliveTime来控制该线程的空闲存活时间(包括核心线程)

unit:keepAliveTime的时间单位

workQueue:阻塞队列

1.当前线程池中的线程数目 >= corePoolSize,则每来一个任务,会尝试将其添加到该队列当中

 

注:在demo中主线程一定要写延迟,否则当主方法结束时,线程池会自动被回收,所有的子线程都会结束。下一章会介绍在springboot中配置线程池。

 

1)newCachedThreadPool

可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。该线程池最大程度保证每一个请求都能立即被处理。

源码:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

缺点:因为其最大允许线程数为 Integer.MAX_VALUE ,所以当同时创建的线程数过多时,会造成OOM(内存溢出错误)

示例代码:

@Test
public void testCachedThreadPool() {
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
  // 子线程1
    cachedThreadPool.execute(() -> {
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("a" + i);
        }
    });
  // 子线程2
    cachedThreadPool.execute(() -> {
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("b" + i);
        }
    });
  // 主线程延时
    try {
        Thread.sleep(25000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

 

2)newFixedThreadPool

定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

缺点:因为LinkedBlockingQueue队列是无参构造,默认最大可存放请求数为 Integer.MAX_VALUE ,所以当队列中存放请求数过多时,会造成OOM(内存溢出错误)

示例代码:

@Test
    public void testFixedThreadPool() {
     // 控制最大并发数为2 ExecutorService fixedThreadPool
= Executors.newFixedThreadPool(2);
     // 子线程1 fixedThreadPool.execute(()
-> { for (int i = 0; i < 10; i++) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.err.println("a" + i); } });
     // 子线程2 fixedThreadPool.execute(()
-> { for (int i = 0; i < 10; i++) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.err.println("b" + i); } });
     // 子线程3 fixedThreadPool.execute(()
-> { for (int i = 0; i < 10; i++) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.err.println("c" + i); } });
     // 主线程延时
try { Thread.sleep(50000); } catch (InterruptedException e) { e.printStackTrace(); } }

 

3)newScheduledThreadPool

周期性线程池,支持任务定时及周期性的执行。

源码:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

缺点:同newCachedThreadPool

示例代码:

@Test
    public void testScheduledThreadPool(){
     // 创建一个最大并发数为5的周期性线程池 ScheduledExecutorService scheduledThreadPool
= Executors.newScheduledThreadPool(5); // 子线程1 延迟三秒后执行一次任务 scheduledThreadPool.schedule(() -> { System.out.println("a 当前时间:" + new Date()); }, 3, TimeUnit.SECONDS); // 子线程2 延迟一秒后开始执行任务,之后每隔三秒执行一次任务 scheduledThreadPool.scheduleAtFixedRate(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("b 当前时间:" + new Date()); }, 1,3, TimeUnit.SECONDS); // 子线程3 延迟一秒后开始执行任务,等上一个任务完成之后再间隔三秒执行下一次任务 scheduledThreadPool.scheduleWithFixedDelay(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("c 当前时间:" + new Date()); }, 1,3, TimeUnit.SECONDS); // 主线程延时 try { Thread.sleep(50000); } catch (InterruptedException e) { e.printStackTrace(); } }

 

4)newSingleThreadExecutor

单线程化线程池,只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

源码:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

缺点:同newFixedThreadPool

示例代码:

@Test
    public void testSingleThreadExecutor(){
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        // 子线程1
        singleThreadExecutor.execute(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("a" + i);
            }
        });
        // 子线程2
        singleThreadExecutor.execute(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("b" + i);
            }
        });
        // 主线程延时
        try {
            Thread.sleep(50000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

以上四种都是ThreadPoolExecutor的扩展,下面介绍第五种线程池newWorkStealingPool。

 

5)newWorkStealingPool

抢占式操作线程池,JDK1.8 版本加入的一种线程池,是新的线程池类ForkJoinPool的扩展,能够合理的使用CPU进行对任务的并行操作。

ForkJoinPool构造函数:

private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

由于目前我还不是很了解newWorkStealingPool、ForkJoinPool,就先提一下,顺便挖个坑,等后期了解过了,会另开一章进行补充和探讨。

 

newWorkStealingPool源码:

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

 

示例代码:

@Test
    public void testWorkStealingPool(){
        ExecutorService workStealingPool = Executors.newWorkStealingPool();
        // 子线程1
        workStealingPool.execute(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("a" + i);
            }
        });
        // 子线程2
        workStealingPool.execute(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("b" + i);
            }
        });
        // 主线程延时
        try {
            Thread.sleep(25000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

 

Executors 线程池

原文:https://www.cnblogs.com/half-moon-stars/p/14633692.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!