1、无返回值demo
1 public class ThreadTest { 2 3 public static void main(String[] args) { 4 // 核心线程池的大小 5 int corePoolSize = 2; 6 // 核心线程池的最大线程数 7 int maxPoolSize = 4; 8 // 线程最大空闲时间 9 long keepAliveTime = 10; 10 // 时间单位 11 TimeUnit unit = TimeUnit.SECONDS; 12 // 阻塞队列 容量为2 13 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2); 14 // 线程创建工厂 15 ThreadFactory threadFactory = new NameTreadFactory(); 16 // 线程池拒绝策略 17 RejectedExecutionHandler handler = new MyIgnorePolicy(); 18 ThreadPoolExecutor executor = null; 19 try { 20 // 推荐的创建线程池的方式 21 // 不推荐使用现成的API创建线程池 22 executor = new ThreadPoolExecutor(corePoolSize, 23 maxPoolSize, keepAliveTime, unit, 24 workQueue, threadFactory, handler); 25 // 预启动所有核心线程 提升效率 26 executor.prestartAllCoreThreads(); 27 // 任务数量 28 int count = 10; 29 for (int i = 1; i <= count; i++) { 30 RunnableTask task = new RunnableTask(String.valueOf(i)); 31 executor.submit(task); 32 } 33 } 34 finally { 35 assert executor != null; 36 executor.shutdown(); 37 } 38 39 } 40 41 /** 42 * 线程工厂 43 */ 44 static class NameTreadFactory implements ThreadFactory { 45 // 线程id 46 private final AtomicInteger threadId = new AtomicInteger(1); 47 48 @Override 49 public Thread newThread(Runnable runnable) { 50 Thread t = new Thread(runnable, "线程-" + threadId.getAndIncrement()); 51 System.out.println(t.getName() + " 已经被创建"); 52 return t; 53 } 54 } 55 56 /** 57 * 线程池拒绝策略 58 */ 59 public static class MyIgnorePolicy implements RejectedExecutionHandler { 60 61 @Override 62 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { 63 doLog(runnable, e); 64 } 65 66 private void doLog(Runnable runnable, ThreadPoolExecutor e) { 67 // 可做日志记录等 68 System.err.println(runnable.toString() + " rejected"); 69 } 70 } 71 72 /** 73 * 线程 74 */ 75 static class RunnableTask implements Runnable { 76 private String name; 77 78 public RunnableTask(String name) { 79 this.name = name; 80 } 81 82 @Override 83 public void run() { 84 try { 85 System.out.println(this.toString() + " is running!"); 86 // 让任务执行慢点 87 Thread.sleep(3000); 88 } 89 catch (InterruptedException e) { 90 e.printStackTrace(); 91 } 92 } 93 94 @Override 95 public String toString() { 96 return "RunnableTask [name=" + name + "]"; 97 } 98 } 99 100 }
ThreadPoolExecutor构造函数解释:参考https://blog.csdn.net/jubaoquan/article/details/79198780
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 }
corePoolSize:线程池中核心线程数的最大值。
maximumPoolSize:线程池中能拥有最多线程数。
workQueue:用于缓存任务的阻塞队列。
关系:执行线程数量未超过corePoolSize,则都在执行;超过后则进入workQueue中等待执行,若队列已满,则创建引入新线程执行,但不能超过maximumPoolSize。
通俗解释:
———————————————————————————————————————————————————————
keepAliveTime:表示空闲线程的存活时间;当临时线程超过keepAliveTime还是空闲状态,则会被清理调。
TimeUnitunit:表示keepAliveTime的单位。
通俗解释:
———————————————————————————————————————————————————————
handler:表示当workQueue已满,且池中的线程数达到maximumPoolSize时,线程池拒绝添加新任务时采取的策略。
通俗解释:
———————————————————————————————————————————————————————
threadFactory:指定创建线程的工厂。
———————————————————————————————————————————————————————
2、有返回值demo
1 public class CallableTest { 2 3 public static void main(String[] args) throws InterruptedException { 4 ExecutorService executor = null; 5 int count = 10; 6 try { 7 // !!! 不推荐使用Executors的静态方法创建线程池 !!! 8 executor = Executors.newCachedThreadPool(); 9 // CompletionService:执行线程、获取线程执行结果 10 CompletionService<String> completionService = new ExecutorCompletionService<>(executor); 11 for (int i = 0; i < count; i++) { 12 FactorialCalculator factorialCalculator = new FactorialCalculator(i); 13 // 放一个实现java.util.concurrent.Callable的类就行 14 completionService.submit(factorialCalculator); 15 } 16 17 for (int i = 0; i < count; i++) { 18 Future<String> result = completionService.take(); 19 System.out.print(result.get()); 20 } 21 } 22 catch (ExecutionException e) { 23 e.printStackTrace(); 24 } 25 finally { 26 assert executor != null; 27 executor.shutdown(); 28 } 29 } 30 }
1 public class FactorialCalculator implements Callable<String> { 2 3 private Integer number; 4 5 public FactorialCalculator(Integer number) { 6 this.number = number; 7 } 8 9 @Override 10 public String call() throws Exception { 11 int result = 1; 12 13 if (number == 0 || number == 1) { 14 result = 1; 15 } 16 else { 17 for (int i = 2; i <= number; i++) { 18 result *= i; 19 TimeUnit.MICROSECONDS.sleep(200); 20 } 21 } 22 return String.format("%s输出%d的阶乘为:%d\n", 23 Thread.currentThread().getName(), number, result); 24 } 25 }
1、无返回demo
1 private static void validateFolkJoin() throws InterruptedException { 2 // 需求:简单打印1-3000的数字 3 // 程序将一个大任务拆分成多个小任务 4 // 并将任务交给ForkJoinPool来执行 5 PrintTask task = new PrintTask(0, 3000); 6 //创建线程池 7 ForkJoinPool pool = new ForkJoinPool(); 8 //将task提交至线程池 9 pool.submit(task); 10 //线程阻塞,等待所有任务完成 11 pool.awaitTermination(2, TimeUnit.SECONDS); 12 13 System.out.printf("RESULT_SET的大小=%s", RESULT_SET.size()); 14 pool.shutdown(); 15 }
2、有返回demo
1 public static void main(String[] args) throws Exception { 2 // 需求:对于长度为10000的元素数组进行累加 3 int[] nums = new int[10000]; 4 Random random = new Random(); 5 int total = 0; 6 // 初始化数组元素 7 long start = System.nanoTime(); 8 for (int i = 0; i < nums.length; i++) { 9 int temp = random.nextInt(100); 10 nums[i] = temp; 11 total += nums[i]; 12 } 13 long end = System.nanoTime(); 14 System.out.println("初始化数组用时:" + (end - start) + "纳秒, 初始化数组总和:" + total); 15 16 // ------------ 主要核心代码 ------------ 17 long startTask = System.nanoTime(); 18 // 创建Task 19 SumTask task = new SumTask(nums, 0, nums.length); 20 // 创建线程池 21 ForkJoinPool pool = new ForkJoinPool(); 22 // 提交任务,存在返回值 23 ForkJoinTask<Integer> future = pool.submit(task); 24 // 显示结果 25 long endTask = System.nanoTime(); 26 System.out.println("线程池计算用时:" + (endTask - startTask) + "纳秒, 线程池执行结果:" + future.get()); 27 // 关闭多线程 28 pool.shutdown(); 29 }
ThreadPoolExecutor:适用于IO密集型任务
ForkJoinPool:适用于CPU密集型任务
原文:https://www.cnblogs.com/bzfsdr/p/13089015.html