相比较CountDownLatch和CyclicBarrier,CompletableFuture支持更为复杂的线程编排。使用场景多为多个线程同步异步执行,而不是一个主线程在等待。
CompletableFuture是jdk1.8提出来的方法,了解这个类之前需要了解一下jdk1.8提出的函数式编程和java.util.function这个包下的内容。
//都是异步方法,入参不一样,后续的then当大
CompletableFuture.runAsync(Runnable runnable)
CompletableFuture.supplyAsync(Supplier<U> supplier)
//没有返回值,get()方法会阻塞指导所有cfs完成
CompletableFuture.allOf(CompletableFuture<?>... cfs)
//支持返回值,cfs中任意一个线程结束,get()返回,其他线程不会被执行或者被中断(思考:如果做到一个线程接触,其他线程快速中断?)
CompletableFuture.anyOf(CompletableFuture<?>... cfs)
//没有真的执行线程
CompletableFuture.completedFuture(U u)
then篇,用于线程的后续执行,支持前一个的返回结果为后一个的入参。
CompletableFuture<String> completableFuture = CompletableFuture//最后一个的执行结果
.runAsync(() -> {
System.out.println("first " + Thread.currentThread().getId());
})//生成一个completableFuture实例并执行异步方法。
.thenApplyAsync(object -> {
System.out.println("second " + Thread.currentThread().getId());
return "secondResult";
})//异步执行,并且有返回值。
.thenApply(new Function<String, String>() {
@Override
public String apply(String v) {
System.out.println("third " + Thread.currentThread().getId());
System.out.println(v);
return "thirdResult";
}
})//串联两个线程,前一个线程的返回作为后一个线程的入参(可被替代)
.thenCompose(param -> {
return CompletableFuture.completedFuture("firth");
})//同步执行,和调用者(第二个completableFuture)同一个线程执行
.thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
}
});
// 补充一个thenCompain,用于谅连接两个返回值作为后一个线程的入参
//runAfterBoth、runAfterEither、acceptEither、applyToEither从语义可知道意思
在写demo的过程中遇到以下问题:
1、问execuse completableFuture21----2输出了几次?
一次,原因是completableFuture21实例一旦运行之后,返回的是新的实例,保留执行结果,后续直接查看执行结果。
2、o5和o6的结果是否是随机的?为什么?
需要基于上述问题,返回一次,查看源码可以知道是输出前面的内容。
CompletableFuture<String> completableFuture21 = CompletableFuture.supplyAsync(() -> {
System.out.println("execuse completableFuture21----2");
return "completableFuture21";
});
CompletableFuture<String> completableFuture22 = CompletableFuture.supplyAsync(() -> {
System.out.println("execuse completableFuture22----1");
return "completableFuture22";
});
CompletableFuture<Void> completableFuture4 = CompletableFuture.allOf(completableFuture21, completableFuture22);
//触发阻塞
completableFuture4.get();
//任意一个线程结束即可,其他线程被中断。completableFuture实例只会被执行一次,然后保存结果
CompletableFuture<Object> completableFuture5 = CompletableFuture.anyOf(completableFuture22, completableFuture21);
//触发阻塞
Object o5 = completableFuture5.get();
System.out.println("返回结果" + JSON.toJSONString(o5));
CompletableFuture<Object> completableFuture6 = CompletableFuture.anyOf(completableFuture22, completableFuture21);
//触发阻塞
Object o6 = completableFuture6.get();
System.out.println("返回结果" + JSON.toJSONString(o6));
在线程中出现异常,支持降级处理。后续线程不会在执行
exceptionally执行的线程和thenApply的执行线程是一致的
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> {
if (2 > 1) {
System.out.println("线程: " + Thread.currentThread().getName() + " 拋出异常");
throw new NullPointerException("123");
}
return "123";
})
.thenApplyAsync(s -> {
if (2 > 1) {
System.out.println("线程: " + Thread.currentThread().getName() + " 拋出异常");
throw new NullPointerException("456");
}
return "456";
})
.exceptionally(throwable -> {
System.out.println("线程: " + Thread.currentThread().getName() + " 抛出异常进入默认处理 " + throwable.getMessage());
return "default";
});
try {
String result = completableFuture.get();
System.out.println("线程: " + Thread.currentThread().getName() + " 输出结果: " + result);
} catch (Exception e) {
System.err.println(e);
}
参考目录:https://www.cnblogs.com/fingerboy/p/9948736.html
原文:https://www.cnblogs.com/ElliottX4/p/14100487.html