看了《java8实战》这本数,学习了java8的异步编程的相关知识,自己想了一个业务场景,通过异步编程来实现,记录一下实战过程中遇到的问题。
业务场景是我之前背英语单词时的一个想法,背单词时遇到一个英语单词不知道真实的使用场景,可以通过这个关键字去quora上搜问题,然后获取其中16个问题,每个问题下取靠前的三个答案,每个答案里面找到包含关键词的一句话作为关键行,然后把所有的关键行打包返回,这样就可以看到这个单词的真实的使用场景了。
这个业务场景下就可以用到异步编程,上面的几个步骤可以抽象成三个方法,getQuestionsByKeyword,getAnswersByQuestion,getKeylineByAnswer;其中后面两个方法可以异步处理,通过Thread.sleep()的方式来模拟方法执行时间。
public List<Question> getQuestionsByKeyword(String keyword, int questionsNum){
return IntStream.range(1, questionsNum+1).mapToObj(i -> new Question()).collect(Collectors.toList());
}
public List<Answer> getAnswersByQuestion(Question question) {
List<Answer> answers = IntStream.range(1, 4).mapToObj(i -> new Answer()).collect(Collectors.toList());
delay(200);
return answers;
}
public String getKeylineByAnswer(Answer answer) {
String keyline = UUID.randomUUID().toString();
delay(100);
return keyline;
}
一般直接想到的处理流程可能如下:
List<Question> questions = syncService.getQuestionsByKeyword(keyword, questionsNum);
questions.stream().map(question -> syncService.getAnswersByQuestion(question))
.flatMap(List::stream)
.map(answer -> syncService.getKeylineByAnswer(answer))
.collect(Collectors.toList());
这样写的话,就完全是单线程处理,会非常耗时,上面的例子,我questionsNum设置的是16,耗时为:8236ms; 现在机器大部分都是多核的,我们要充分的压榨机器的性能,应该使用多线程来处理,一个小小的改动如下:
List<Question> questions = syncService.getQuestionsByKeyword(keyword, questionsNum);
questions.parallelStream().map(question -> syncService.getAnswersByQuestion(question))
.flatMap(List::stream)
.map(answer -> syncService.getKeylineByAnswer(answer))
.collect(Collectors.toList());
这里的改动是: stream() -> parallelStream();把流改成了并行流,再执行一次看看耗时: 1049ms; 和上面的同步处理比起来耗时只有同步的1/8;
并行流的耗时最多只能降到这里了吗?还有其他优化的余地吗?我们不妨再想想。
parallelStream既然是并行,那么它用到了几个线程呢?
parallelStream默认使用的线程池为 ForkJoinPool.commonPool(), 线程池大小为系统的cpu核心数,为Runtime.getRuntime().availableProcessors(); 我的电脑是8核的,所以是8个线程,questionsNum设置的是16,所以这8个线程需要处理两轮,如果是16线程的话一轮就可以处理完了,可以节省一半的时间
那么如何自定义本次parallelStream使用的线程池呢?代码如下:
List<Question> questions = syncService.getQuestionsByKeyword(keyword, questionsNum);
ForkJoinPool forkJoinPool = new ForkJoinPool(20);
return forkJoinPool.submit(() -> questions.parallelStream().map(question -> syncService.getAnswersByQuestion(question))
.flatMap(List::stream)
.map(answer -> syncService.getKeylineByAnswer(answer))
.collect(Collectors.toList())).join();
再执行一次,耗时为:536ms, 和使用默认线程池比起来,的确节省了一半的时间。
既然增加了线程数就可以减少耗时,那么是不是线程数越多越好呢,实际上也不是。这就带来了一个新的问题,线程池设置多大比较合适?
首先业务有 I/O密集型和CPU密集型两种:
除了并行流,书中还介绍了另一种异步编程方式: CompletableFuture; CompletableFuture是java8引入的,实现了Future,也对Future做了增强。
本次业务代码里用到的CompletableFutrue的方法主要有3个: CompletableFuture.supplierAsync, thenCompose, join
(Supplier<U> supplier, Executor executor),
supplier是生产者型函数式接口,ecexutor为自定义线程池,如果不指定线程池,那么和parallelStream一样默认使用ForkJoinPool,大小为核心数 // 自定义线程池
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("englishLearning-pool-%d").build();
Executor executor = new ThreadPoolExecutor(20, 100, 60, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100),
factory,
new ThreadPoolExecutor.DiscardOldestPolicy());
List<Question> questions = syncService.getQuestionsByKeyword(keyword, questionsNum);
List<CompletableFuture<List<String>>> linesFuture = questions.stream()
.map(question -> CompletableFuture.supplyAsync(() -> syncService.getAnswersByQuestion(question), ThreadPoolUtil.getEnglishLearningPool()))
.map(future -> future.thenCompose(answers -> CompletableFuture.supplyAsync(() -> syncService.getKeylineByAnswerAsync(answers), ThreadPoolUtil.getEnglishLearningPool())))
.collect(Collectors.toList());
return linesFuture.stream().map(CompletableFuture::join).flatMap(List::stream)
.collect(Collectors.toList());
先解释一下线程池创建过程中几个参数的作用:
public ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 池里允许的最大线程数
long keepAliveTime, // 核心线程以外的其他线程的空闲时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 队列,当线程数大于核心线程,小于最大线程,且队列满了时,创建新线程
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler) // 拒绝策略
上面代码里面有两个Stream,之所以没有合并成一个,是因为Stream的延迟特性会引起顺序执行。书中对此原理的解释图为:
代码里有一点需要注意,当调用syncService.getKeylineByAnswer(Answer answer)方法时报错,提示需要的参数为 Answer, 而提供的是List
public List<String> getKeylineByAnswerAsync(List<Answer> answers) {
List<CompletableFuture<String>> collect = answers.stream()
.map(answer -> CompletableFuture.supplyAsync(() -> getKeylineByAnswer(answer), ThreadPoolUtil.getEnglishLearningPool()))
.collect(Collectors.toList());
return collect.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
最后执行一下代码,耗时为:344ms,比优化后的并行流耗时更短
总结一下,java8中有两种异步编程方式,一种是并行流parallelStream,一种是CompletableFuture。日常使用中应该根据实际业务情况来选用,比如如果是计算密集型,没有I/O,推荐使用parallelStream,因为实现简单,便于理解,效率也可能是最高的; 如果是I/O密集型,推荐使用CompletableFuture,灵活性更好,可以定制线程池参数。
原文:https://www.cnblogs.com/n0thappy/p/15212533.html